RabbitMQ在Java中使用 SpringBoot 从基础到高级

news2024/9/22 23:27:59

充分利用每一个监听者

需要充分利用每一个消费者,需要在配置文件中加上prefetch配置并设置为1

rabbitmq:
  listener:
    simple:
      prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

创建交换机和队列

创建队列

  1. "fanout.queue1":队列的名称,这里是 “fanout.queue1”。
  2. false:指示队列是否是持久化的。如果设置为 true,则表示队列会在 RabbitMQ 服务器重启后仍然存在。如果设置为 false,则表示队列是非持久化的,即在 RabbitMQ 服务器重启后会被删除。
  3. false:指示队列是否是独占的。如果设置为 true,则表示只有声明该队列的连接可以使用它。如果设置为 false,则表示其他连接也可以使用该队列。
  4. true:指示队列是否会自动删除。如果设置为 true,则表示当没有消费者连接到该队列时,队列会自动删除。如果设置为 false,则表示队列不会自动删除。
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CreateFanoutQueue {
 	@Bean
    public Queue fanoutQueue1() {
        QueueBuilder.durable("fanout").build();
        return new Queue("fanout.queue1", false, false, true);
    }
}

创建交换机

  1. name:交换机的名称。
  2. durable:指示交换机是否是持久化的。如果设置为 true,则表示 RabbitMQ 服务器重启后仍然存在。如果设置为 false,则表示交换机是非持久化的,即在 RabbitMQ 服务器重启后会被删除。在你的代码中,durable 参数被设置为 false,表示这个交换机是非持久化的。
  3. autoDelete:指示交换机是否是自动删除的。如果设置为 true,则表示当没有与之绑定的队列时,交换机会自动删除。如果设置为 false,则表示交换机不会自动删除。在你的代码中,autoDelete 参数被设置为 false,表示这个交换机不会自动删除。
@Configuration
public class CreateFanoutExchange {

    /**
     * 创建永久交换机,必须要设置是否自动删除
     *
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange2() {
        return new FanoutExchange("bunny.fanout2", false, false);
    }
}

注解创建交换机和队列

  • bindings:用于定义队列和交换机之间的绑定关系。在你的代码中,通过 @QueueBinding 注解来定义了一个队列绑定。该绑定包括一个名为 “fanout.queue3” 的队列和一个名为 “bunny.fanout” 的交换机之间的绑定关系。
  • value:用于指定队列的属性。在你的代码中,通过 @Queue 注解来指定了队列的名称为 “fanout.queue3”,并设置了 durable = "true",表示该队列是持久化的。
  • exchange:用于指定交换机的属性。在你的代码中,通过 @Exchange 注解来指定了交换机的名称为 “bunny.fanout”,类型为 ExchangeTypes.FANOUT,并设置了 durable = "true",表示该交换机是持久化的。
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class FanoutListener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "fanout.queue3", durable = "true"),
            exchange = @Exchange(name = "bunny.fanout", type = ExchangeTypes.FANOUT, durable = "true")
    ))
    public void listenFanoutQueue3(String message) {
        System.out.println("消费者3接收到Fanout消息:【" + message + "】");
    }
}

交换机类型与作用

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是 Fanout 交换机
  • Direct:订阅,基于 RoutingKey(路由 key)发送给订阅了消息的队列
  • Topic:通配符订阅,与 Direct 类似,只不过 RoutingKey 可以使用通配符

Fanout交换机

[!important]

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange的会将消息路由到每个绑定的队列

无特殊功能,当队列发送消息和接受消息时,只能发送到交换机, 交换机把消息发送给绑定过的所有队列, 订阅队列的消费者都能拿到消息。

当我们给交换机绑定了三个队列,这三个队列收到消息即可完成消息监听和发送。

使用方式
  1. 创建两个队列,分别为fanout.queue1fanout.queue2

在这里插入图片描述

  1. 创建bunny.fannout交换机。

在这里插入图片描述

  1. 绑定两个队列到交换机中。

在这里插入图片描述

Java创建队列和交换机
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class FanoutListener {
    /**
     * 监听 消费者1 是否收到消息
     *
     * @param message 消息
     */
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String message) {
        System.out.println("消费者1接收到Fanout消息:【" + message + "】");
    }

    /**
     * 监听 消费者2 是否收到消息
     *
     * @param message 消息
     */
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String message) {
        System.out.println("消费者2接收到Fanout消息:【" + message + "】");
    }

    /**
     * 监听 消费者2 是否收到消息
     *
     * @param message 消息
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "fanout.queue3", durable = "true"),
            exchange = @Exchange(name = "bunny.fanout", type = ExchangeTypes.FANOUT, durable = "true")
    ))
    public void listenFanoutQueue3(String message) {
        System.out.println("消费者3接收到Fanout消息:【" + message + "】");
    }
}
Java发送消息
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class TestSendFanout {
    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 向队列发送消息 fanout.queue1
     */
    @Test
    void testSendFanout1() throws Exception {
        for (int i = 0; i < 100; i++) {
            rabbitTemplate.convertAndSend("bunny.fanout", null, "第二个消息队列:" + i);
        }
    }
}

在这里插入图片描述

Direct交换机

使用方式

[!important]

Direct交换机与Fanout交换机的差异

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

Direct交换机发送消息时根据路由的key来发送的,而Fanout交换机是广播发送不设置路由的key

在这里插入图片描述

在这里插入图片描述

Java创建
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class DirectListener {
    /**
     * * 监听者1
     * 创建队列 持久化的、不自动删除
     * 创建交换机 持久化的、不自动删除
     * 无接受的key
     *
     * @param message 接受消息
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1", durable = "true", autoDelete = "false"),
            exchange = @Exchange(name = "bunny.direct", type = ExchangeTypes.DIRECT, durable = "true", autoDelete = "false")
    ))
    public void listenDirectQueue1(String message) {
        System.out.println("消费者1接收到 Direct 消息:【" + message + "】");
    }

    /**
     * * 监听者2
     * 创建队列 持久化的、不自动删除
     * 创建交换机 持久化的、不自动删除
     * key包含 red 和 yellow
     *
     * @param message 接受消息
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2", durable = "true", autoDelete = "false"),
            exchange = @Exchange(name = "bunny.direct", type = ExchangeTypes.DIRECT, durable = "true", autoDelete = "false"),
            key = {"red", "yellow"}
    ))
    public void listenDirectQueue2(String message) {
        System.out.println("消费者2接收到 Direct key 为 {\"red\", \"yellow\"} 消息:【" + message + "】");
    }

    /**
     * * 监听者3
     * 创建队列 持久化的、不自动删除
     * 创建交换机 持久化的、不自动删除
     * key包含 blue 和 yellow
     *
     * @param message 接受消息
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue3", durable = "true", autoDelete = "false"),
            exchange = @Exchange(name = "bunny.direct", type = ExchangeTypes.DIRECT, durable = "true", autoDelete = "false"),
            key = {"blue", "yellow"}
    ))
    public void listenDirectQueue3(String message) {
        System.out.println("消费者2接收到 Direct key 为 {\"blue\", \"yellow\"} 消息:【" + message + "】");
    }

    /**
     * * 监听者4
     * 创建队列 持久化的、不自动删除
     * 创建交换机 持久化的、不自动删除
     * key包含 yellow
     *
     * @param message 接受消息
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue4", durable = "true", autoDelete = "false"),
            exchange = @Exchange(name = "bunny.direct", type = ExchangeTypes.DIRECT, durable = "true", autoDelete = "false"),
            key = "yellow"
    ))
    public void listenDirectQueue4(String message) {
        System.out.println("消费者2接收到 Direct key 为 \"yellow\" 消息:【" + message + "】");
    }
}
Java发送消息
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class TestSendDirect {
    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 发送黄色消息
     */
    @Test
    void testSendDirectYellow() throws Exception {
        for (int i = 0; i < 1000; i++) {
            rabbitTemplate.convertAndSend("bunny.direct", "yellow", "发送消息:" + i);
        }
    }

    /**
     * 发送红色消息
     */
    @Test
    void testSendDirectRed() throws Exception {
        for (int i = 0; i < 1000; i++) {
            rabbitTemplate.convertAndSend("bunny.direct", "red", "发送消息:" + i);
        }
    }

    /**
     * 发送蓝色消息
     */
    @Test
    void testSendDirectBlue() throws Exception {
        for (int i = 0; i < 1000; i++) {
            rabbitTemplate.convertAndSend("bunny.direct", "blue", "发送消息:" + i);
        }
    }
}

由于消费者1没有绑定任何key所以任何消息都没有接受到。

在这里插入图片描述

Topic交换机

使用方式

[!important]

Direct交换机与Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 **.** 分割
  • Topic交换机与队列绑定时的bindingKey可以指定通配符
  • #:代表0个或多个词
  • *:代表1个词

在这里插入图片描述

在这里插入图片描述

Java创建
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class TopicListener {
    /**
     * * 监听者1
     * 创建队列 持久化的、不自动删除
     * 创建交换机 持久化的、不自动删除
     * key为china.#
     *
     * @param message 接受消息
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "bunny.topic", type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void listenTopicChina(String message) {
        System.out.println("消费者1接收到topic.queue1的消息:【" + message + "】");
    }

    /**
     * * 监听者1
     * 创建队列 持久化的、不自动删除
     * 创建交换机 持久化的、不自动删除
     * #.news
     *
     * @param message 接受消息
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "bunny.topic", type = ExchangeTypes.TOPIC),
            key = "#.news"
    ))
    public void listenTopicNews(String message) {
        System.out.println("消费者1接收到topic.queue2的消息:【" + message + "】");
    }
}
Java发送消息
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class TestSendTopic {
    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 发送消息 key 包含 china.# 的
     */
    @Test
    void testSendTopic1() throws Exception {
        for (int i = 0; i < 1000; i++) {
            rabbitTemplate.convertAndSend("bunny.topic", "china.ly", "china.ly 发送消息:" + i);
        }
    }

    /**
     * 发送消息 key 包含 #.news 的
     */
    @Test
    void testSendTopic2() {
        for (int i = 0; i < 1000; i++) {
            rabbitTemplate.convertAndSend("bunny.topic", "ly.news", "ly.news 发送消息:" + i);
        }
    }
}

在这里插入图片描述
在这里插入图片描述

消息持久化

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:

  • 交换机持久化(在交换机类型与作用已解释)
  • 队列持久化(在交换机类型与作用已解释)
  • 消息持久化(本文介绍)

使用MessageBuilder创建使用调用方法setDeliveryMode,在里面设置持久化消息和非持久化消息

非持久化消息

setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)设置非持久化消息。

import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.nio.charset.StandardCharsets;

@SpringBootTest
public class TestSendFanout {
    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * * 发送数据非持久化
     * 向 bunny.fanout 发送消息
     */
    @Test
    void testSendFanout2() throws Exception {
        // 创建消息-非持久化消息
        Message message = MessageBuilder.withBody("hello world".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();

        // 发送很多消息
        for (int i = 0; i < 1000000; i++) {
            rabbitTemplate.convertAndSend("bunny.fanout", null, message);
        }
    }
}

持久化消息

setDeliveryMode(MessageDeliveryMode.PERSISTENT)设置持久化消息。

import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.nio.charset.StandardCharsets;

@SpringBootTest
public class TestSendFanout {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    void testSendFanout3() throws Exception {
        // 创建消息-持久化消息
        Message message = MessageBuilder
                .withBody("hello".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .build();

        // 发送很多消息
        for (int i = 0; i < 1000000; i++) {
            rabbitTemplate.convertAndSend("bunny.fanout", null, message);
        }
    }
}

惰性队列

惰性队列比其它队列性能好,速度快。

控制台操作

使用控制台添加惰性队列。

在这里插入图片描述

Java方式添加

不基于注解

使用IOC容器注入,使用属性名方式lazy()创建。

@Configuration
public class CreateLazyQueue {
    /**
     * 创建惰性队列
     *
     * @return 惰性队列
     */
    @Bean
    public Queue lazyQueue() {
        return QueueBuilder.durable("lazy.queue1").lazy().build();
    }
}
基于注解

也是比较常见的方式,这种也方便。

@Queue(name = "lazy.queue2", durable = "true", autoDelete = "false", arguments = @Argument(name = "x-queue-mode", value = "lazy"))

全部代码

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class LazyListener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "lazy.queue2", durable = "true", autoDelete = "false", arguments = @Argument(name = "x-queue-mode", value = "lazy")),
            exchange = @Exchange(name = "lazy.fanout", type = ExchangeTypes.FANOUT, durable = "true", autoDelete = "false")
    ))
    public void listenLazyQueue1(String message) {
        System.out.println("消费者1接收到 Lazy 消息:【" + message + "】");
    }
}

更新已有队列为lazy模式

对于已经存在的队列,也可以配置为lazy模式,但是要通过设置policy实现。

命令行方式

可以基于命令行设置policy

命令解读:

  • rabbitmqctl :RabbitMQ的命令行工具
  • set_policy :添加一个策略
  • Lazy :策略名称,可以自定义
  • "^lazy-queue$" :用正则表达式匹配队列的名字
  • '{"queue-mode":"lazy"}' :设置队列模式为lazy模式
  • --apply-to queues:策略的作用对象,是所有的队列
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  
控制台方式

在这里插入图片描述

消费者的可靠性

[!tip]

当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:

  • 消息投递的过程中出现了网络故障
  • 消费者接收到消息后突然宕机
  • 消费者接收到消息后,因处理不当导致异常

失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false	

Java中配置

当消息中出现异常时,自动转到错误交换机和错误队列中,方便开发和调试人员查看。

创建error.direct,设置key为error,队列为error.queue

当然,如果在配置中没有设置错误机制这时某些配置也没有必要加载进来,当配置spring.rabbitmq.listener.simple.retry.enabledtrue时才启用当前配置。

@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")// 当开启错误重试这个配置才有效果

示例代码

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")// 当开启错误重试这个配置才有效果
@Slf4j
public class ErrorConfiguration {
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
        log.info("加载错误交换机");
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}
只查看不做处理

如果只是想在MQ控制台中查看错误消息,并不需要监听错误消息,这时可以使用之前IOC注入方式配置。

创建交换机、队列,之后绑定设置需要传入的key。

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")// 当开启错误重试这个配置才有效果
@Slf4j
public class ErrorConfiguration {
    @Bean
    public DirectExchange errorMessageExchange() {
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue() {
        return new Queue("error.queue", true);
    }

    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange) {
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
        log.info("加载错误交换机");
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}
测试配置

为了验证我们配置的是否有问题,可以在监听消息时手动抛出异常。

对消息进行错误测试,在消息中抛出异常。

/**
 * * 监听者3
 * 创建队列 持久化的、不自动删除
 * 创建交换机 持久化的、不自动删除
 * key包含 blue 和 yellow
 *
 * @param message 接受消息
 */
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue3", durable = "true", autoDelete = "false"),
        exchange = @Exchange(name = "bunny.direct", type = ExchangeTypes.DIRECT, durable = "true", autoDelete = "false"),
        key = {"blue", "yellow"}
))
public void listenDirectQueue3(String message) {
    System.out.println("消费者3接收到 Direct key 为 {\"blue\", \"yellow\"} 消息:【" + message + "】");
    throw new RuntimeException("错误消息");
}

在这里插入图片描述

对消息进行监听处理

如果需要对错误队列进行监听并且做出相应处理,使用注解方式,可以直接创建并监听消息。

其中自动绑定了消息队列,和自动创建了错误交换机。

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

@Component
@Slf4j
public class ErrorListener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "error.queue", durable = "true"),
            exchange = @Exchange(name = "error.direct", type = ExchangeTypes.DIRECT),
            key = "error"
    ))
    @RabbitListener(queues = "error.queue")
    public void listenError(String message) {
        System.out.println(LocalDateTime.now() + "收集错误队列-消费者接收到 error 消息:【" + message + "】");
    }
}

在这里插入图片描述

全部的配置

spring:
  application:
    name: demo-mq
  rabbitmq:
    host: 192.168.1.6 # 主机地址
    port: 5672 # 端口
    virtual-host: /bunny # 虚拟主机
    username: bunny # 用户名
    password: "02120212" # 密码
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
        acknowledge-mode: auto # 确认机制
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初始失败等待时长
          multiplier: 1 # 下次失败等待时间被树,下次等待时长 multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true 无状态 false 有状态。如果业务中包含事务,这里改为false
    connection-timeout: 1s # 设置mq连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 200ms # 失败后初始等待时间
        multiplier: 1 # 失败后下次等待时长倍数,发送消息失败不会走这个
        max-attempts: 3 # 最大重试次数
    publisher-confirm-type: none # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

业务幂等性

什么事幂等性

幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),例如求绝对值函数。 在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:

  • 根据id删除数据
  • 查询数据
  • 新增数据

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。

所以,我们要尽可能避免业务被重复执行。 然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:

  • 页面卡顿时频繁刷新导致表单重复提交
  • 服务间调用的重试
  • MQ消息的重复投递

生成消息唯一ID

SpringAMQPMessageConverter自带了MessageID的功能,我们只要开启这个功能即可。 以Jackson的消息转换器为例:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}

延迟消息

对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存

像这种在一段时间以后才执行的任务,我们称之为延迟任务,而要实现延迟任务,最简单的方案就是利用MQ的延迟消息了。

在RabbitMQ中实现延迟消息也有两种方案:

  • 死信交换机+TTL
  • 延迟消息插件

基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此RabbitMQ社区提供了一个延迟消息插件来实现相同的效果。

DelayExchange插件

官方文档说明

Scheduling Messages with RabbitMQ | RabbitMQ - Blog

插件下载地址

GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ

在这里插入图片描述

安装延迟插件

找到RabbitMQ镜像插件位置。

docker inspect mq
# 或者执行
docker volume inspect mq-plugins

在这里插入图片描述

将插件拖入rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez

在这里插入图片描述

执行命令

rabbitmq_delayed_message_exchange是你的插件名称。

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在这里插入图片描述

Java中使用

创建延迟消息,延迟消息如果很多而且延迟时间较长不建议使用MQ去处理这些消息,因为在内部会维护一个时钟,如果消息很大时间又长,对于系统资源消耗会很大。

如果时间很长可以使用Redis去处理这些内容。

不基于注解

使IOC容器方式创建延迟消息。

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class CreateDirectExchange {
    /**
     * 创建延迟交换机
     *
     * @return 延迟交换机
     */
    @Bean
    public DirectExchange delayExchange() {
        return ExchangeBuilder.directExchange("delay.direct")
                .delayed()// 设置delay的属性为true
                .durable(true)// 持久化
                .build();
    }
}
基于注解

使用注解方式一次性创建交换机、队列、延迟消息。

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class DelayListener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue", durable = "true"),
            exchange = @Exchange(name = "delay.direct", delayed = "true"),
            key = "delay"
    ))
    public void listenDelay(String message) {
        System.out.println("消费者接收到 delay 消息:【" + message + "】");
    }
}

创建延迟消息会有独特的tag。

在这里插入图片描述

创建队列

在这里插入图片描述

发送延迟消息

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
@Slf4j
public class TestSendDelay {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    void testSendDelay() throws Exception {
        rabbitTemplate.convertAndSend("delay.direct", "delay", "延迟消息", message -> {
            message.getMessageProperties().setDelayLong(5000L);
            return message;
        });
        log.info("延迟消息发送成功");
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1527219.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RK3588_Qt交叉编译环境搭建

buildroot编译 进入 /home/linux/plat/rk3588/sdk/buildroot 目录下&#xff0c;执行 Source ./envsetup.sh 选择具体平台编译&#xff0c;后再执行make编译 /home/linux/plat/rk3588/sdk/buildroot/output/OK3568/images 生成的rootfs.ext2镜像重新烧写到rk3568开发板中&…

计算地球圆盘负荷产生的位移

1.研究背景 计算受表面载荷影响的弹性体变形问题有着悠久的历史&#xff0c;涉及到许多著名的数学家和物理学家&#xff08;Boussinesq 1885&#xff1b;Lamb 1901&#xff1b;Love 1911&#xff0c;1929&#xff1b;Shida 1912&#xff1b;Terazawa 1916&#xff1b;Munk &…

22款Visual Studio Code实用插件推荐

前言 Visual Studio Code是一个轻量级但功能强大的源代码编辑器&#xff0c;轻量级指的是下载下来的Visual Studio Code其实就是一个简单的编辑器&#xff0c;强大指的是支持多种语言的环境插件拓展&#xff0c;也正是因为这种支持插件式安装环境开发让Visual Studio Code成为…

计算机二级C语言的注意事项及相应真题-5-程序设计

目录 41.计算下列级数和&#xff0c;和值由函数值返回42.统计出x所指数组中能被e整除的所有元素&#xff0c;这些元素的和通过函数值返回主函数&#xff0c;元素个数通过形参num返回主函数43.使数组右上三角元素中的值乘以m44.将a、b中的两个两位正整数合并形成一个新的整数放在…

3.18数据结构

一、数据结构----->用来组织存储数据 一组用来保存一种或多种特定关系的数据的集合&#xff08;组织和存储数据&#xff09; 程序 数据结构 算法 MVC&#xff1a;软件设计架构 M&#xff1a;数据的管理&#xff08;数据结构&#xff09; V&#xff1a;视图&#xff0c…

SQLiteC/C++接口详细介绍之sqlite3类(十八)

返回目录&#xff1a;SQLite—免费开源数据库系列文章目录 上一篇&#xff1a;SQLiteC/C接口详细介绍之sqlite3类&#xff08;十七&#xff09; 下一篇&#xff1a;SQLiteC/C接口详细介绍sqlite3_stmt类&#xff08;一&#xff09; ​ 56.sqlite3_update_hook 函数功能&am…

【边缘智能】Jetson板卡上安装QT5与OpenCV集成

学习《OpenCV应用开发&#xff1a;入门、进阶与工程化实践》一书 做真正的OpenCV开发者&#xff0c;从入门到入职&#xff0c;一步到位&#xff01; 安装QT5与QT Creator 如果只是简单的使用QT的GUI库&#xff0c;没有其它要求&#xff0c;其实特别容易&#xff0c;一行命令行…

计算机视觉之三维重建(1)---摄像机几何

文章目录 一、针孔模型和透镜1.1 针孔摄像机1.2 近轴折射模型1.3 透镜问题 二、摄像机几何2.1 像平面和像素平面2.2 齐次坐标下的投影变换2.3 摄像机倾斜2.4 规范化摄像机2.5 世界坐标系2.6 Faugeras定理2.7 投影变换性质&#xff1a; 三、其他投影摄像机模型3.1 弱透视投影摄像…

php版本的AI电话机器人系统有哪些优势

PHP版本的AI电话机器人系统具有以下优势&#xff1a; 提升客户体验&#xff1a;AI电话机器人能够为客户提供724小时的服务&#xff0c;无论何时客户有疑问或需要帮助&#xff0c;都可以得到及时响应1。 提高工作效率和客户满意度&#xff1a;AI电话机器人系统具有智能回答问题…

oracle创建序列

oracle创建序列 oracle创建序列00-查看当前用户创建的序列01-创建序列02-创建序列方法二03-序列使用04-删除序列 oracle创建序列 00-查看当前用户创建的序列 SELECT * FROM user_sequences;01-创建序列 --01-创建序列 /* 语法&#xff1a; CREATE SEQUENCE 序列名称 START W…

python知识点总结(四)

这里写目录标题 1、Django 中的缓存是怎么用的&#xff1f;2、现有2元、3元、5元共三种面额的货币&#xff0c;如果需要找零99元&#xff0c;一共有多少种找零的方式?3、代码执行结果4、下面的代码执行结果为&#xff1a;5、说一下Python中变量的作用域。6、闭包7、python2与p…

Hive:数据仓库利器

1. 简介 Hive是一个基于Hadoop的开源数据仓库工具&#xff0c;可以用来存储、查询和分析大规模数据。Hive使用SQL-like的HiveQL语言来查询数据&#xff0c;并将其结果存储在Hadoop的文件系统中。 2. 基本概念 介绍 Hive 的核心概念&#xff0c;例如表、分区、桶、HQL 等。 …

深入浅出Go的`encoding/xml`库:实战开发指南

深入浅出Go的encoding/xml库&#xff1a;实战开发指南 引言基本概念XML简介Go语言中的XML处理结构体标签&#xff08;Struct Tags&#xff09; 解析XML数据使用xml.Unmarshal解析XML结构体标签详解处理常见解析问题 生成XML数据使用xml.Marshal生成XML使用xml.MarshalIndent优化…

服务器中了mallox勒索病毒还能恢复数据吗?

什么是mallox勒索病毒&#xff1f; mallox是一种最近多发的勒索病毒类型&#xff0c;它主要针对企业的Web应用和数据库服务器进行攻击。mallox后缀的勒索病毒会加密用户的重要文件数据并以此为要挟索要赎金。该类病毒会绕过企业的防火墙和各种防护软件&#xff0c;对目标设备进…

带你深入了解数据库的事务

为什么要使用事务 日常开发中&#xff0c;很多操作&#xff0c;不是通过一个SQL就能完成的&#xff0c;往往需要多个SQL配合完成 当执行多个SQL操作的时候&#xff0c;如果中间出现了特殊的情况&#xff08;程序崩溃&#xff0c;系统奔溃&#xff0c;网络断开&#xff0c;主机…

R语言:microeco:一个用于微生物群落生态学数据挖掘的R包:第七:trans_network class

# 网络是研究微生物生态共现模式的常用方法。在这一部分中&#xff0c;我们描述了trans_network类的所有核心内容。 # 网络构建方法可分为基于关联的和非基于关联的两种。有几种方法可以用来计算相关性和显著性。 #我们首先介绍了基于关联的网络。trans_network中的cal_cor参数…

进程的一些概述

文章目录 前言一、进程以及查看指令二、查看进程 前言 进程是什么&#xff1f;进程应该如何描述&#xff1f;进程又该如何管理&#xff1f;进程如何查看&#xff1f; 一、进程以及查看指令 一个正在运行的程序&#xff0c;加载到内存中的程序是进程也叫做任务,用任务资源管理…

从单机到分布式微服务,大文件校验上传的通用解决方案

一、先说结论 本文将结合我的工作实战经历&#xff0c;总结和提炼一种从单体架构到分布式微服务都适用的一种文件上传和校验的通用解决方案&#xff0c;形成一个完整的方法论。本文主要解决手段包括多线程、设计模式、分而治之、MapReduce等&#xff0c;虽然文中使用的编程语言…

在Visual Studio中调试 .NET源代码

前言 在我们日常开发过程中常常会使用到很多其他封装好的第三方类库&#xff08;NuGet依赖项&#xff09;或者是.NET框架中自带的库。如果可以设置断点并在NuGet依赖项或框架本身上使用调试器的所有功能&#xff0c;那么我们的源码调试体验和生产效率会得到大大的提升。今天我…

openKylin系统安装ssh服务结合内网穿透实现固定公网地址访问

文章目录 1. 安装SSH服务2. 本地SSH连接测试3. openKylin安装Cpolar4. 配置 SSH公网地址5. 公网远程SSH连接小结 6. 固定SSH公网地址7. SSH固定地址连接 openKylin是中国首个基于Linux 的桌面操作系统开发者平台&#xff0c;通过开放操作系统源代码的方式&#xff0c;打造具有自…