RabbitMQ入门到实战一篇文章就够了

news2024/12/30 3:44:23

RabbitMQ

课程内容

  • 认识RabbitMQ
  • 安装RabbitMQ
  • SpringBoot使用RabbitMQ
  • 其他特性

一.RabbitMQ入门

1.RabbitMQ认识

1.1.RabbitMQ是什么

MQ全称为Message Queue,即消息队列. 它也是一个队列,遵循FIFO原则 。RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue Protocol高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开 发中应用非常广泛。官方地址:http://www.rabbitmq.com/

1.2.RabbitMQ的使用场景

开发中消息队列通常有如下应用场景: 消峰,解耦,提速,大数据处理

  • 消除峰值:用于高并发场景消除峰值,让并发请求在mq中进行排队

  • 大数据处理:由于数据量太大,程序一时处理不过来,可以通过把数据放入MQ,多开几个消费者去处理消息,比如:日志收集等

  • 服务异步/解耦 :服务之间通过RPC进行通信的方式是同步方式,服务消费方需要等到服务提供方相应结果后才可以继续执行,使用MQ之后的服务通信是异步的,服务之间没有直接的调用关系,而是通过队列进行服务通信, 应用程序解耦合 MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。

  • 排序保证 FIFO :遵循队列先进先出的特点,可以保证数据按顺序消费

除此之外使用MQ还可以达到:提高系统响应速度,提高系统稳定性的目的。 将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。 提高了应用程序的响应时间。另外如果系统挂了也没关系,数据放到消息队列.后续可以继续消费

但是需要注意的是:对数据的一致性要求较高的业务场景不适合使用MQ,因为MQ具有一定的数据延迟

1.3.AMQP协议

AMQP是一套公开的消息队列协议,最早在2003年被提出,它旨在从协议层定义消息通信数据的标准格式, 为的就是解决MQ市场上协议不统一的问题,RabbitMQ就是遵循AMQP标准协议开发的MQ服务。 官方:http://www.amqp.org

1.4.JMS是什么 ?

JMS是Java消息服务,是java提供的一套消息服务API标准,其目的是为所有的java应用程序提供统一的消息通信的标准,类似java的 jdbc,只要遵循jms标准的应用程序之间都可以进行消息通信。它和AMQP有什么 不同,jms是java语言专属的消 息服务标准,它是在api层定义标准,并且只能用于java应用;而AMQP是在协议层定义的标准,是跨语言的 。

2.RabbitMQ的工作流程

2.1.RabbitMQ中核心概念
  • Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。

  • Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。exchange有下面四种(先了解:fanout,direct,topics,header)

  • Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。

  • Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。

  • Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

2.2.RabbitMQ的工作流程

在这里插入图片描述

消息发布接收流程:

1.发送消息

1、生产者和Broker建立TCP连接。

2、生产者和Broker建立通道。

3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。

4、Exchange将消息转发到指定的Queue(队列)

消息接收消息

1、消费者和Broker建立TCP连接

2、消费者和Broker建立通道

3、消费者监听指定的Queue(队列)

4、当有消息到达Queue时Broker默认将消息推送给消费者。

5、消费者接收到消息。

3.RabbitMQ安装

3.1.Docker安装RabbitMQ
1.下载docker镜像
docker pull rabbitmq:3-management
2.启动容器

需要开放端口:5672是程序连接的端口,15672是可视化界面接口

docker run -id --name=rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
3.访问管理界面

安装好之后,访问:15672界面如下,账号和密码都是 guest

在这里插入图片描述

二.SpringBoot中使用RabbitMQ

1.SpringBoot整合RabbitMQ

1.1.导入依赖

第一步需要导入mq的基础依赖,SpringBoot使用的是2.6.13

	<!--SpringBoot依赖-->
    <parent>
        <groupId> org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.13</version>
    </parent>

	<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>
2.2.配置RabbitMQ

第二步:配置mq,主要是配置连接信息

server:
  port: 10200
spring:
  application:
    name: rabbitmq‐application
  rabbitmq:
    host: 60.204.187.34
    port: 5672
    username: guest
    password: guest
    virtualHost: /

编写启动类,省略…

2.HelloWorld(直连模型)

2.1.模型认识

rabbitMQ提供了7种消息模型。https://www.rabbitmq.com/tutorials

在这里插入图片描述

我们使用Hello World 案例来入门,这种模式比较简单,只需要一个生产者,一个队列,一个消费者即可

在这里插入图片描述

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,用来存储消息的,生产者把消息发送到队列中,消费者从队列中消费消息。类似一个邮箱,可以缓存消息;

我们需要做如下事情

  1. 创建一个队列
  2. 编写消费者,监听该队列
  3. 编写生产者发送消息,指定该队列名
2.2.配置队列

在SpringBoot中交换机和队列的创建都通过Bean的方式来进行,下面定义了一个队列,名字为hello:

@Configuration
public class RabbitmqConfig {
    //定义消息队列的名字
    public static final String NAME_HELLO = "queue_hello";

    @Bean
    public Queue queue() {
        //创建一个队列队列,并指定队列的名字
        return new Queue(NAME_HELLO,true);
    }
}
2.3.编写消费者

rabbitmq通过@RabbitListener(queues = {队列名}) 来监听队列,从而消费消息

@Component
public class ReceiveHandler {

    //监听NAME_HELLO队列
    @RabbitListener(queues = {RabbitmqConfig.NAME_HELLO})
    public void receiveHelloQueueMessage(String msg, Message message, Channel channel) {
        System.out.println("消费者收到消息:"+msg);
    }
}
2.4.编写MQ生产者

通过注入:RabbitTemplate发送消息,以及消息内容。

@RestController
public class SenderController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostMapping("/sender/hello/{message}")
    public String senderHello(@PathVariable String message) {
        /**
         * 参数说明
         * exchnage: 交换机,默认交换机指定为“”即可
         * routingKey :发送消息的路由键,该模式下使用队列名即可
         * message:消息的内容
         */
        rabbitTemplate.convertAndSend("", RabbitmqConfig.NAME_HELLO,message);
        return "success";
    }
}

注意:这个的交换机使用的是默认的交换机"" ,路由键直接指定为队列的名字。其实在MQ中是提供了几个默认的交换机,当我们把交换机指定为 “” , 就会使用默认的交换机来转发消息,而我们创建的队列会和默认的交换机进行绑定,如下:

在这里插入图片描述

下面是绑定关系图

在这里插入图片描述

2.5.测试

启动程序访问controller进行测试,控制台可以看到消费者打印的日志,打开MQ的可视化界面可以看到创建的队列,之所以里面没有消息是因为消息被消费了。

在这里插入图片描述

3.WorkQueue(工作队列)

3.1.WorkQueue模型认识

Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的,即:同一个消息只会被一个消费者消费

在这里插入图片描述

3.2.代码演示

WorkQueue和HelloWorld本身无区别,只是在HelloWorld的基础上多增加消费者而已,如下:

@Component
public class ReceiveHandler {

    //监听NAME_HELLO队列
    @RabbitListener(queues = {RabbitmqConfig.NAME_HELLO})
    public void receive1(String msg, Message message, Channel channel) {
        System.out.println("消费者1收到消息:"+msg);
    }

    //监听NAME_HELLO队列
    @RabbitListener(queues = {RabbitmqConfig.NAME_HELLO})
    public void receive2(String msg, Message message, Channel channel) {
        System.out.println("消费者2收到消息:"+msg);
    }
}

连续多次发送消息MQ会使用轮询方式把消息评价分配给多个消费者

在这里插入图片描述

3.3.指定拉取数量

这种消费模式有一个问题,当某个消费者消费能力偏弱会导致后续的消息阻塞,我们可以通过 prefetch 来指定消费者每次只能拉取一个消息,这样的话当某个消费者正在忙碌,那么MQ会把消息推送给别的消费者,防止消息在某个消费者身上发生阻塞。

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1

4.fanout-广播模型

4.1.模型认识

在上面的案例中,我们采用一个队列来发送消息,及时同一个队列监听了多个消费者,同一个消息也只会给到其中一个消费者,而发布订阅模型允许一个消息向多个消费者投递。而对于:fanout , direct , topics都属于发布订阅模型。

在这里插入图片描述

RabbitMQ的exchnage正好有4中类型,就对应了上述的几种订阅模型,源码如下:

public abstract class ExchangeTypes {
    public static final String DIRECT = "direct";
    public static final String TOPIC = "topic";
    public static final String FANOUT = "fanout";
    public static final String HEADERS = "headers";
    public static final String SYSTEM = "system";
}

Fanout被叫做广播模型,它的特点是当生产者把消息投递给交换机,交换机会把消息投递给和它绑定的所有队列,而相应的所有的消费者都能收到消息,如上图。要实现Fanout模型我们要做如下几个事情

  1. 定义自己的fanout类型的交换机
  2. 定义多个队列
  3. 把队列和交换机进行绑定
  4. 消费者监听不同队列
4.2.配置交换机和队列

下面配置了一个fanout类型的交换机和2个队列,并把队列绑定到了交换机

@Configuration
public class RabbitmqConfigFanout {
    //定义消息队列的名字
    public static final String QUEUE_1 = "queue1";
    public static final String QUEUE_2 = "queue2";
    public static final String EXCHANGE_FANOUT = "exchnage-fanout";


    @Bean
    public Exchange exchange(){
        //定义一个fanout类型的交换机,并指定持久化
        return ExchangeBuilder.fanoutExchange(EXCHANGE_FANOUT).durable(true).build();
    }

    @Bean
    public Queue queue1() {
        //创建一个队列队列,并指定队列的名字和持久化
        return new Queue(QUEUE_1,true);
    }

    @Bean
    public Queue queue2() {
        //创建一个队列队列,并指定队列的名字
        return new Queue(QUEUE_2,true);
    }

    @Bean
    public Binding bindingQueue1() {
        //fanout模式不指定routingkey
        return BindingBuilder
                .bind(queue1()).to(exchange()).with("").noargs();
    }
    @Bean
    public Binding bindingQueue2() {
        return BindingBuilder
                .bind(queue2()).to(exchange()).with("").noargs();
    }
}
4.3.编写消费者

消费者只需要监听不同的队列即可

    @RabbitListener(queues = {RabbitmqConfigFanout.QUEUE_1})
    public void receiveFanout1(String msg, Message message, Channel channel) {
        System.out.println("fanout消费者1收到消息:"+msg);
    }

    @RabbitListener(queues = {RabbitmqConfigFanout.QUEUE_2})
    public void receiveFanout2(String msg, Message message, Channel channel) {
        System.out.println("fanout消费者2收到消息:"+msg);
    }

4.4.编写生产者

生产者发送消息的时候需要指定exchange的名字,注意:routingkey不需要指定

    @PostMapping("/sender/fanout/{message}")
    public String senderFanout(@PathVariable String message) {
        /**
         * 参数说明
         * exchnage: 交换机,使用自定义的交换机
         * routingKey :发送消息的路由键,fanout模式指定为“”
         * message:消息的内容
         */
        rabbitTemplate.convertAndSend(RabbitmqConfigFanout.EXCHANGE_FANOUT, "",message);
        return "success";
    }
4.5.测试

启动测试,发送一个消息2个消费者都能收到

5.Routing(路由模型)

5.1.模型认识

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费,我们就要用的routing路由模式,这种模式是通过一个routingkey来收发消息。交换机的类型使用direct

在这里插入图片描述

如上图:不同的队列在绑定到交换机时指定的routingkey是不一样的,这样一来我们发送消息的时候,就可以通过不同的routingkey来把消息发送到不同的队列中,从而使不同的消费者去消费,该模型我们需要做如下几个步骤

  1. 创建direct类型的交换机
  2. 创建多个队列
  3. 把队列绑定到交换机,但是绑定时需要指定不同的routingkey
  4. 消费者消费不同队列的消息
5.2.配置交换机和队列

这里定义了一个direct类型的交换机,以及2个队列,队列在绑定到交换机时采用了不同的routingkey.

@Configuration
public class RabbitmqConfigDirect {
    //定义消息队列的名字
    public static final String QUEUE_DIRECT_1 = "direct_queue1";
    public static final String QUEUE_DIRECT_2 = "direct_queue2";
    public static final String EXCHANGE_DIRECT = "exchnage-direct";


    @Bean
    public Exchange exchange(){
        //定义一个direct类型的交换机,并指定持久化
        return ExchangeBuilder.directExchange(EXCHANGE_DIRECT).durable(true).build();
    }

    @Bean
    public Queue queue1() {
        //创建一个队列队列,并指定队列的名字
        return new Queue(QUEUE_DIRECT_1,true);
    }

    @Bean
    public Queue queue2() {
        //创建一个队列队列,并指定队列的名字
        return new Queue(QUEUE_DIRECT_2,true);
    }

    @Bean
    public Binding bindingQueue1() {
        return BindingBuilder
                .bind(queue1()).to(exchange()).with("pay").noargs();
    }
    @Bean
    public Binding bindingQueue2() {
        return BindingBuilder
                .bind(queue2()).to(exchange()).with("order").noargs();
    }
}
5.3.配置消费者

消费者消费不同队列中的消息即可

    @RabbitListener(queues = {RabbitmqConfigDirect.QUEUE_DIRECT_1})
    public void receiveDirect1(String msg, Message message, Channel channel) {
        System.out.println("receiveDirect1消费者1收到消息:"+msg);
    }

    @RabbitListener(queues = {RabbitmqConfigDirect.QUEUE_DIRECT_2})
    public void receiveDirect2(String msg, Message message, Channel channel) {
        System.out.println("receiveDirect2消费者2收到消息:"+msg);
    }

5.4.定义生产者

生产者需要指定自己的交换机,以及routingkey,指定不同的routingkey决定了消息会发送到不同的队列中

    @PostMapping("/sender/direct/{message}")
    public String senderDirect(@PathVariable String message) {
        /**
         * 参数说明
         * exchnage: 交换机,使用自定义的交换机
         * routingKey :发送消息的路由键,fanout模式指定为“”
         * message:消息的内容
         */
        rabbitTemplate.convertAndSend(RabbitmqConfigDirect.EXCHANGE_DIRECT, "pay",message);
        rabbitTemplate.convertAndSend(RabbitmqConfigDirect.EXCHANGE_DIRECT, "order",message);
        return "success";
    }

6.Topics(通配符)

6.1.模型认识

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

在这里插入图片描述

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: goods.insert

通配符规则:

  • #:匹配一个或多个词

  • *:匹配不多不少恰好1个词

举例:

audit.#:能够匹配audit.irs.corporate 或者 audit.irs

audit.*:只能匹配audit.irs

topic 和 direct 没有本质的区别,只是在绑定队列时可以使用通配符

6.2.定义交换机和队列

需要定义topics类型的交换机和2个队列,绑定队列的时候指定routingkey,可以使用通配符来指定

@Bean
public Binding bindingQueue1() {
    return BindingBuilder
        .bind(queue1()).to(exchange()).with("#.pay").noargs();
}
@Bean
public Binding bindingQueue2() {
    return BindingBuilder
        .bind(queue2()).to(exchange()).with("#.order").noargs();
}
6.3.编写生产者

发送消息的时候指定的routingkey如果能命中绑定时的routingkey消息就可以发送到相应的队列中,比如:

rabbitTemplate.convertAndSend(RabbitmqConfigTopics.EXCHANGE_TOPIC, "good.pay",message);
rabbitTemplate.convertAndSend(RabbitmqConfigTopics.EXCHANGE_TOPIC, "account.pay",message);
rabbitTemplate.convertAndSend(RabbitmqConfigTopics.EXCHANGE_TOPIC, "good.order",message);

前2个消息的routingkey可以命中 #.pay, 第3条消息可以命中 #.order.

三.RabbitMQ其他特性

1.签收机制

1.1.自动签收的问题

在RabbitMQ中包含手动签收和自动签收2钟模式,上述案例都采用的是自动签收,也就是当MQ吧消息投递给消费者后,消息默认被签收,MQ就会直接把消息删除掉。这种模式可能会导致消息丢失分享:比如消费者拿到消息并未成功消费,但是MQ已经把消息删除,从而造成了消息的丢失,所以在司机开发中尽量使用手动签收

1.2.手动牵手

手动签收模式意味着MQ不会自动签收消息,而是把消息推送给消费者后,等到消费者自己去签收消息后,再删除队列中的消息,这种模式可以防止消息丢失。我们可以通过下面配置来指定签收模式

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual #默认是 auto 自动签署

然后在消费者成功消费完消息后,触发手动签收,代码如下

@RabbitListener(queues = {RabbitmqConfigDirect.QUEUE_DIRECT_2})
public void receiveDirect2(String msg, Message message, Channel channel) throws IOException {
    System.out.println("receiveDirect2消费者2收到消息:"+msg);
    //拿到消息的tag
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    //签收消息:指定消息的tags ,以及不做批量签收
    channel.basicAck(deliveryTag,false);
}

channel.basicAck : 签收消息

  • deliveryTag :消息的标签,代表了一条消息

除此之外我们还可以不签收消息,或者拒绝消息.不签收的消息会一直重复消费,而被拒绝的消息会丢弃掉

//不签收
channel.basicNack(deliveryTag,false,false);
//拒绝消息
channel.basicReject(deliveryTag,false);

2.持久化

mq消息在内存中进行读写,如果MQ宕机那么消息有丢失的风险,我们需要通过持久化来防止消息丢失

2.1.交换机持久化

创建交换机的时候,指定durable属性为true

@Bean
public Exchange exchange(){
    //定义一个direct类型的交换机,并指定持久化
    return ExchangeBuilder.directExchange(EXCHANGE_DIRECT).durable(true).build();
}
2.2.队列持久化

创建队列时指定durable属性为true

@Bean
public Queue queue1() {
    //创建一个队列队列,并指定队列的名字
    return new Queue(QUEUE_DIRECT_1,true);
}
2.3.消息持久化

当我们发送一个消息内容的时候,SpringBoot会自动帮我们持久化

rabbitTemplate.convertAndSend(RabbitmqConfigDirect.EXCHANGE_DIRECT, "order",message);

底层会自动构建Message对象,Messge对象中有一个MessageProperties属性,它包含了MessageDeliveryMode.PERSISTENT持久化和NON_PERSISTENT不持久化2中方式。

3.可靠消息投递

3.1.回调接口介绍

在RabbitTemplate中提供了2个接口

  • ConfirmCallback : 消息投递到Brocker后触发回调,可以用来检测消息是否到达RabbitMQ
  • ReturnsCallback : 消息发送失败回调,比如队列路由失败

要开启上面2中回调需要在yaml中做如下配置

spring:
  rabbitmq:
    publisher-returns: true #开启returnCallback回调
    template:
      mandatory: true #消息会返回给发送者的回调,而不是丢弃
    publisher-confirm-type: correlated #开启ConfirmCallback 回调

然后需要编写回调接口,通过实现 RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback

@Component
@Slf4j
public class RabbitMQCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        //ReturnedMessage 消息对象中包括:交换机,路由key,消息内容等
        log.info(returnedMessage.getExchange()
                +","+returnedMessage.getRoutingKey()
                +","+new String(returnedMessage.getMessage().getBody()));
        //把失败的消息再次发送
        rabbitTemplate.convertAndSend(returnedMessage.getExchange(),
                                      returnedMessage.getRoutingKey(),returnedMessage.getMessage());
    }

    /**
     * @param correlationData :消息的唯一标识
     * @param ack :消息确认结果
     * @param cause :错误原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info(correlationData.getId() +","+ack +","+cause);
    }
}
  • ConfirmCallback : 消息投递到Brocker的exchange后触发回调,可以用来检测消息是否到达RabbitMQ,通过confirm来把投递结果返回,通过ack我们可以判断消息是否投递到MQ中。
  • ReturnsCallback : 消息发送失败回调,比如队列路由失败。通过 returnedMessage 来把失败的消息返回,我们可以通过该方法对失败的消息进行重试发送

在这里插入图片描述

然后自定义template,把2个回调设置给template


//以下配置RabbitMQ消息服务
@Autowired
public ConnectionFactory connectionFactory;

@Autowired
private RabbitMQCallback rabbitMQCallback;

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    // 这里的转换器设置实现了发送消息时自动序列化消息对象为message body
    template.setMandatory(true);
    template.setReturnsCallback(rabbitMQCallback);
    template.setConfirmCallback(rabbitMQCallback);

    return template;
}
3.2.可靠消息投递方案

根据上面2个回调接口的特性,我们可以做一个可靠消息投递方案,方案如下:

  1. 设计一个消息日志表,可以基于数据库,也可以基于Redis,字段有:交换机,路由key, 消息内容,发送状态(发送中,发送成功,发送失败),重试次数等。
  2. 在使用rabbitTemplate发布消息之前,把消息的内容持久化到 :消息日志表中,状态为:发送中
  3. 通过回调来监听消息发送结果,如果成功,把消息日志状态修改为:成功,如果发送失败,把消息日志修改为:失败
  4. 额外创建定时任务,定时读取失败的日志进行重试发送,并增加重试次数,直到发送成功,如果发送到一定次数依然不成功就不再重试,而是通知管理员抢修。

在这里插入图片描述

4.延迟队列

4.1.为什么要用

在开发项目的时候我们通常会遇到这么一个问题,比如商城项目有一下单逻辑,下单成功数据保存在数据库中,下单成功后需要用户进行支付,如果在30分钟内支付失败,需要修改订单的支付状态为“支付超时”并关闭订单以及回退库存操作,那如何在下单30后准时检查支付结果处理订单状态呢?

你可能想到了一个最简单的方法,就是使用定时任务扫描订单表,判断时间是否支付超时,这样的方式无疑是一种很消耗性能的做法,你试想一下,定时扫描一张数据量很大的表去判断时间和状态,而且99%的扫描都是无效的操作。

那么该如何优雅的解决上述问题呢?我们可以采用延迟队列来实现,Redis和MQ都可以做到,本文章采用RabbitMQ的延迟队列来实现。

4.2.延迟队列实现原理

说到延迟队列就要说一说消息的过期时间(存活时间)TTL,RabbitMQ可以给队列设置过期时间,也可以单独给每个消息设置过期时间,如果到了过期时间消息没被消费该消息就会标记为死信消息。

除此之外还有那些消息会成为死信消息?

  • 一是设置了TTL的消息到了TTL过期时间还没被消费,会成为死信
  • 二是消息被消费者拒收,并且reject方法的参数里requeue是false,意味这这个消息不会重回队列,该消息会成为死信,
  • 三是由于队列大小限制,新的消息进来队列可能满了,MQ会淘汰掉最老的消息,这些消息可能会成为死信消息

成为死信的消息会进入一个死信交换机(Dead Letter Exchange)中,死信交换机也是一个普通的交换机而已,根据这一特点,我们可以准备一个队列来接收死信交换机中的死信消息,然后准备一个消费者来消费该队列中的消息,这样一来我们的延迟队列就有思路了,还是按照订单为例流程如下:
在这里插入图片描述

  1. 下单成功(生产者),加入下单消息到队列(order.message)
  2. 队列设置TTL过期时间(10000毫秒),同时指定了死信交换机“delay-exchange”和死信交换机转发消息的队列“delay-message”
  3. 消息进入队列,等待一段时间,如果TTL时间到,订单消息会被MQ扔给死信交换机,死信交换机会把消息扔给指定的死信队列delay-message
  4. 消费者正好监听了死信队列delay-message,就可以获取到消息进行消费,比如检查该消息对应的订单是否支付,做出退库存处理等。

整体效果就是,消息进入order.message队列 延迟 10秒后就 会进入delay-message队列然后被消费者消费处理,这就是一个延迟队列的效果。

注意,这里的delay-exchange死信交换机其实就是一个普通的交换机而已,所以我们可以把上面的两个交换机合并成一个,如下:
在这里插入图片描述

4.3.延迟队列实战

第一步,定义交换机和队列

import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

//rabbitMQ的配置
@Configuration
public class MQConfig {
    //交换机
    public static final String EXCHNAGE_DELAY = "EXCHNAGE_DELAY";
    //订单队列,该队列中的消息设置过期时间
    public static final String QUEUE_ORDER = "QUEUE_ORDER";
    //该队列用来接收死信交换机转发过来的消息
    public static final String QUEUE_DELAY = "QUEUE_DELAY";
    //队列的路由键,该路由键用来接收订单消息传出到订单队列
    public static final String ROUTINGKEY_QUEUE_ORDER = "ROUTINGKEY_QUEUE_ORDER";
    //该路由键用来接收死信交换机转发过来的消息
    public static final String ROUTINGKEY_QUEUE_DELAY = "ROUTINGKEY_QUEUE_DELAY";

    //定义交换机
    @Bean
    public Exchange exchangeDelay(){
        return ExchangeBuilder.topicExchange(EXCHNAGE_DELAY).durable(true).build();
    }
    //该队列中的消息需要设置ttl
    @Bean
    public Queue queueOrder(){
        Map<String,Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange", EXCHNAGE_DELAY);    //过期的消息给哪个交换机的名字
        map.put("x-dead-letter-routing-key", ROUTINGKEY_QUEUE_DELAY);   //死信交换机把消息个哪个个routingkey
        map.put("x-message-ttl", 10000);    //队列过期时间10s
        return new Queue(QUEUE_ORDER,true,false,false,map);
    }
    //该队列接收死信交换机转发过来的消息
    @Bean
    public Queue queueDelay(){
        return new Queue(QUEUE_DELAY,true);
    }
    @Bean
    public Binding queueOrderBinding(){
        return BindingBuilder.bind(queueOrder()).to(exchangeDelay()).with(ROUTINGKEY_QUEUE_ORDER).noargs();
    }
    @Bean
    public Binding queueDelayBinding(){
        return BindingBuilder.bind(queueDelay()).to(exchangeDelay()).with(ROUTINGKEY_QUEUE_DELAY).noargs();
    }
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

第二步,写一个消息发送者

System.out.println("发送消息:我是一个延迟消息,开始时间:"+System.currentTimeMillis());
rabbitTemplate.convertAndSend(
    MQConfig.EXCHNAGE_DELAY,
    MQConfig.ROUTINGKEY_QUEUE_ORDER,
    "我是一个延迟消息"
);

第三步,写一个消费者

@Component
public class Consumer {

    @RabbitListener(queues = MQConfig.QUEUE_DELAY)
    public void handler(String message){
        System.out.println("收到消息:"+message+",结束时间:"+System.currentTimeMillis());
    }
}

第六步,测试效果

  • 生产者执行后,观察MQ,QUEUE_ORDER中有消息

在这里插入图片描述

  • 等待10s之后,消息进入QUEUE_DELAY队列

在这里插入图片描述

  • 控制台打印效果
Producer:   发送消息:我是一个延迟消息,开始时间:1606295976347
Consumer: 收到消息:我是一个延迟消息,结束时间:1606295986418

发送消息到收到消息的时间差为 10071 , 忽略网络开销,延迟时间差不多就是我们设置的TTL时间

5.消息重复消费

因为消息本身是有重试机制或者我们为了保证消息一定能投递成功可能会导致消息多次投递,那么对于消费者而言消息的重复消费处理就变得非常重要。通常我们可以使用消息的唯一标识来避免重复消费,大概思路如下

  1. 找到消息本省的唯一标识,或者在数据中设置一个唯一标识,比如:订单就可以把订单号作为唯一标识
  2. 消费者每次做了消息消费后,会把这个唯一标识记录下来,比如记录到数据库,或者Redis都可以。
  3. 消费者每次消费前都拿到消息的这个唯一标识去判断一下消息是否被消费,如果已经被消费国了就不要再消费了

四.面试必备

  1. MQ的使用场景
  2. RabbitMQ的工作流程
  3. RabbitMQ如何防止消息丢失
  4. RabbitMQ的消息模型有哪几种(交换机有哪几种)
  5. 如何处理消息重复消费
  6. 如何实现延迟队列(如果通过MQ实现订单超时取消)
  7. 什么情况下消息会变成死信消息
  8. 消息的签收模式有哪几种,有什么区别

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1499607.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

德人合科技|天锐绿盾加密软件——数据防泄漏系统

德人合科技是一家专注于提供企业级信息安全解决方案的服务商&#xff0c;提供的天锐绿盾加密软件是一款专为企业设计的数据安全防护产品&#xff0c;主要用于解决企事业单位内部敏感数据的防泄密问题。 www.drhchina.com PC端&#xff1a; https://isite.baidu.com/site/wjz012…

Spring Boot 生成与解析Jwt

Spring Boot 生成与解析Jwt Maven依赖 <dependency><groupId>io.jsonwebtoken</groupId><artifactId>jjwt</artifactId><version>0.9.1</version> </dependency>生成&解析 package yang;import io.jsonwebtoken.Claims…

神经网络(neural network)

在这一章中我们将进入深度学习算法&#xff0c;学习一些神经网络相关的知识&#xff0c;这些是有更加强大的作用&#xff0c;更加广泛的用途。 神经元和大脑(neurons and the brain): 我们对于我们的编程的进步主要来自我们对于大脑的研究&#xff0c;根据我们对于大脑的研究…

耐腐蚀PFA消解管可配四氟回流盖适配海能莱伯泰科全自动石墨消解仪

PFA消解管&#xff0c;也叫PFA消化管、特氟龙消解管、耐高温消解杯等&#xff0c;应用于气相、液相、等离子光谱质谱、原子吸收、原子荧光等化学分析方法的样品前处理&#xff0c;可消解重金属、农残、食品、淤泥、稀土、水产品、有机物等。 PFA消解管 PFA消解管可耐强酸、强碱…

【论文阅读】(2024.03.05-2024.03.15)论文阅读简单记录和汇总

(2024.03.05-2024.03.15)论文阅读简单记录和汇总 2024/03/05&#xff1a;随便简单写写&#xff0c;以后不会把太详细的记录在CSDN&#xff0c;有道的Markdown又感觉不好用。 目录 &#xff08;ICMM 2024&#xff09;Quality Scalable Video Coding Based on Neural Represent…

(2024.03.04)如何打包Android Studio项目?

一、引言 最近帮助很多从未谋面的小伙伴调试了一下AS的项目&#xff0c;调试完成后需要他们查看是否与当初需求一致&#xff0c;此时就需要将已完成的项目打包&#xff0c;即导出APK安装包&#xff0c;发送到他们的手机上就可以下载安装使用了。在这里一是分享给大家打包方法&…

Java项目:41 springboot大学生入学审核系统的设计与实现010

作者主页&#xff1a;源码空间codegym 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 项目介绍 本大学生入学审核系统管理员和学生。 管理员功能有个人中心&#xff0c;学生管理&#xff0c;学籍信息管理&#xff0c;入学办理管理等。 学…

实验1:查看CPU和内存,用机器指令和汇编指令编程

预备知识&#xff1a;Debug的使用 1.Debug&#xff1a; Debug时DOS、Windows都提供的实模式&#xff08;8086 方式&#xff09;程序的调试工具。 debug是Windows 16位或者32位机器上的一款调试工具。 也就是说&#xff0c;在WindowsXP及以前的机器上都有debug&#xff0c;直接…

【C++从0到王者】第五十二站:跳表

文章目录 一、什么是跳表二、skiplist的效率三、skiplist的实现 一、什么是跳表 skiplist本质上也是一种查找结构&#xff0c;用于解决算法中的查找问题&#xff0c;跟平衡搜索树和哈希表的价值是一样的&#xff0c;可以作为key或者key/value的查找模型。 skiplist&#xff0c;…

【PCIe】 PCIe 拓扑结构与分层结构

&#x1f525;博客主页&#xff1a;PannLZ 文章目录 PCIe拓扑结构PCIe分层结构 PCIe拓扑结构 计算机网络中的拓扑结构源于拓扑学(研究与大小、形状无关的点、线关系的方法)。 把网络中的计算机和通信设备抽象为一个点&#xff0c;把传输介质抽象为一条线&#xff0c;由点和线组…

HTTP请求响应详解 (HTTP请求数据格式,常见请求方式,后端响应参数)及Apifox(postman)使用方式

目录 一.HTTP协议 二.HTTP请求数据格式 请求方式 三.后端响应请求 基于SpringBoot响应数据 请求响应的参数类型 同一响应格式 四.Apifox(postman)使用方法 一.HTTP协议 HTTP&#xff08;Hypertext Transfer Protocol&#xff0c;超文本传输协议&#xff09;是一种用…

maven项目引入私有jar,并打包到java.jar中

私有jar存放位置 maven依赖 <dependency><groupId>com.hikvision.ga</groupId><artifactId>artemis-http-client</artifactId><version>1.1.10</version><scope>system</scope><systemPath>${project.basedir}/s…

JS实现chatgpt数据流式回复效果

最近高了一个简单chatgpt对话功功能&#xff0c;回复时希望流式回复&#xff0c;而不是直接显示结果&#xff0c;其实很简单&#xff0c;前端流式读取即可&#xff0c;后端SSE实现流式传输 前端用到fetch获取数据&#xff0c;然后利用reader读取 let requestId parseInt(Ma…

笛量智能加入飞桨技术伙伴计划,共同打造“AI+私域智慧运营”新模式

近日&#xff0c;上海笛量智能科技有限公司正式加入飞桨技术伙伴计划&#xff0c;双方将共同努力在私域运营机器人技术及生态建设做出贡献&#xff0c;致力打造“AI私域运营”智能化新模式&#xff0c;助力产业降本增效。 上海笛量智能科技有限公司 上海笛量智能科技有限公司&a…

6款ai写作一键生成,让你的文字脱颖而出

如今AI写作工具正逐渐走入我们的生活&#xff0c;成为我们写作的得力助手&#xff0c;它们通过分析大量的文本数据和机器学习算法&#xff0c;能够快速生成文章的大纲、段落结构等&#xff0c;从而帮助我们节省写作时间&#xff0c;提高写作效率。今天&#xff0c;我将为大家介…

文章解读与仿真程序复现思路——电网技术EI\CSCD\北大核心《考虑净负荷均衡的分布式光伏集群电压调控策略研究》

本专栏栏目提供文章与程序复现思路&#xff0c;具体已有的论文与论文源程序可翻阅本博主免费的专栏栏目《论文与完整程序》 论文与完整源程序_电网论文源程序的博客-CSDN博客https://blog.csdn.net/liang674027206/category_12531414.html 电网论文源程序-CSDN博客电网论文源…

Vue时间轴

之前有这样子的需求没有用第三方插件于是自己写一个简单的时间轴 时间轴滚动条并左右切换滚动条位置相对应移动 <div class"time-scrollbar"><div v-if"timeLineData.length>0" class"scrollbar-content"><div class"ar…

NextJs教程系列(四):路由loading

loading加载 loading.js 可以帮助你使用React Suspense创建一个组件, 当你在加载路由内容时&#xff0c;它会显示该加载状态组件&#xff0c;渲染完成后&#xff0c;新的内容将会自动替换。 传统ssr渲染流程 传统的ssr渲染流程&#xff0c;当用户请求一个页面时&#xff0c;服…

入门学习Python推荐书籍

. Python作为一门易学易用的编程语言&#xff0c;在近些年得到了越来越多的关注和应用。Python的开发效率极高&#xff0c;语言特性丰富&#xff0c;拓展性强。因此&#xff0c;Python成为了众多IT工程师、科研人员、数据分析师以及爱好者的首选。 那么&#xff0c;对于初学者…

led护眼灯真的能护眼吗?五大热门护眼台灯测评,不容错过!

如今&#xff0c;儿童近视率不断攀升&#xff0c;其中用眼过度疲劳已成为近视的主要诱因。学习环境中光线的适宜与否&#xff0c;直接关乎孩子眼睛的疲劳程度。因此&#xff0c;为孩子营造一个舒适、健康的学习环境显得尤为关键。而一款优质的护眼台灯&#xff0c;正是预防近视…