手把手搭建springboot项目06-springboot整合RabbitMQ及其原理和应用场景

news2024/11/18 10:38:30

目录

  • 前言
    • 工作流程-灵魂画手
    • 名词解释
    • 交换机类型
  • 一、安装
    • 1.1 [RabbitMQ官网安装](https://www.rabbitmq.com/download.html)
    • 1.2 Docker安装并启动
  • 二、食用教程
    • 2.1.导入依赖
    • 2.2 添加配置
    • 2.3 代码实现
      • 2.3.1 直连(Direct)类型
      • 2.3.2 引入消息手动确认机制
      • 2.3.2 广播(Fanout)类型
      • 2.3.3 主题(Topic)类型
  • 三、实战应用场景
    • 3.1 如何控制消息有序
    • 3.2 保证消息不被重复消费(幂等性)
    • 3.3 保证消息的可靠性
    • 3.4 死信队列解决订单超时未支付
  • 总结


前言

RabbitMQ是一个由erlang语言开发,实现了AMQP(Advanved Message Queue Protocol)高级消息队列协议的消息服务中间件。

工作流程-灵魂画手

工作流程
1、生产者(Producer)和消费者(Consumer)都需要在与RabbitMQ建立长连接(Connection)的前提下,才能收发消息
2、客户端(生产者、消费者)和服务端(RabbitMQ)只能建立一条长连接,在长连接中开辟一条条的信道进行收发消息
3、生产者发送消息,消息到达Broker指定虚拟主机(服务会配置)的指定交换机(发送消息会指定),根据路由键和交换机与队列的绑定关系,把消息发送给对应的队列
3、消费者通过信道监听队列,消息进入队列就可以被消费者实时拿到

名词解释

1、Broker (message broker) 消息代理:消息队列服务器实体,简单理解为邮局,寄收件都要通过它。
2、JMS(Java Message Service)JAVA消息服务。是基于JVM消息代理的规范。ActiveMQHornetMQ是JMS实现
3、AMQP(Advanced Message Queuing Protocol)
高级消息队列协议,也是一个消息代理的规范,兼容JMS
RabbitMQ是AMQP的实现
在这里插入图片描述
4、Message 消息,由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
5、Producer消息的生产者,也是一个向交换器发布消息的客户端应用程序。
6、Exchange 交换机 ,用来接收生产者发送的消息,并将这些消息路由给服务器中的队列。
Exchange常用有3种类型:direct、fanout、 topic、不同类型的Exchange转发消息的策略有所区别
7、Queue 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
8、Binding绑定,用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Exchange 和 Queue 的绑定可以是多对多的关系。
9、Connection 网络连接,比如一个TCP连接。
10、Channel 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
11、Consumer 消费者,表示一个从消息队列中取得消息的客户端应用程序。
12、Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

交换机类型

在这里插入图片描述


一、安装

1.1 RabbitMQ官网安装

1.2 Docker安装并启动

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.11-management

# 开机自启
docker update rabbitmq --restart=always

● 5672 (AMQP端口)
● 15672 (web管理后台端口)

本地安装可通过:http://127.0.0.1:15672/访问,用户名密码默认guest

二、食用教程

2.1.导入依赖

<!--RabbitMQ-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.2 添加配置

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: root #用户名 默认guest
    password: root #密码 默认guest
    virtual-host: springboot-test #虚拟主机 默认/

2.3 代码实现

2.3.1 直连(Direct)类型

直连型交换机,根据消息携带的路由键将消息投递给对应队列。

直连类型初始化配置

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 1、直连交换机配置
 */
@Configuration
public class DirectRabbitConfig {

    public static final String DIRECT_QUEUE = "===DirectQueue===";
    public static final String DIRECT_EXCHANGE = "===DirectExchange===";
    public static final String DIRECT_ROUTING = "===DirectRouting===";

    /**
     *      durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
     *      exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
     *      autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
     *      return new Queue("TestDirectQueue",true,true,false);
     */
    @Bean
    public Queue directQueue() {
        return new Queue(DIRECT_QUEUE,false);
    }

    @Bean
    DirectExchange directExchange() {
        return new DirectExchange(DIRECT_EXCHANGE,false,false);
    }

    @Bean
    Binding binding() {
        return BindingBuilder.bind(directQueue())
                .to(directExchange()).with(DIRECT_ROUTING);
    }
}

该配置主要把队列、交换机、绑定都交由spring管理,记得声明队列、交换机、建立绑定关系。消息指定交换机发送后,交换机就可以根据路由键把消息发送到匹配的队列上。

消费者

import com.chendi.springboot_rabbitmq.config.DirectRabbitConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Slf4j
@Component
public class DirectReceiver {

    @RabbitListener(queues = DirectRabbitConfig.DIRECT_QUEUE)
    public void receiver(String dataMsg) {
        log.info("接收者A dataMsg:{} ",dataMsg);
    }

    @RabbitListener(queues = DirectRabbitConfig.DIRECT_QUEUE)
    public void receiver(String dataMsg) {
        log.info("接收者B dataMsg:{} ",dataMsg);
    }
}

生产者

@RestController
@RequiredArgsConstructor
public class RabbitMQTestController {

    final RabbitTemplate rabbitTemplate;

    @GetMapping("/sendDirectMessage")
    public String sendDirectMessage() {
        for (int i = 0; i < 10; i++) {
            String messageData = "Hello World!" + i;
            //可自定义消息体类型
            rabbitTemplate.convertAndSend(DirectRabbitConfig.DIRECT_EXCHANGE, DirectRabbitConfig.DIRECT_ROUTING, messageData);
        }
        return "发送完成";
    }
}

运行发现:默认情况下,RabbitMQ轮询分发将按顺序将每个消息发送给下一个使用者。有如下缺点:
1、无法保证消息已被消费
2、处理消息快的服务得到的消息和处理消息慢的服务是一样多的(公平分发、能者多劳)。

2.3.2 引入消息手动确认机制

配置文件

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual # 设置消费端手动 ack
        #表示消费者端每次从队列拉取多少个消息进行消费,直到手动确认消费完毕后,才会继续拉取下一条
        prefetch: 1 # 预加载消息数量--QOS

消费者应答

@Slf4j
@Component
public class DirectReceiver {

    @RabbitListener(queues = DirectRabbitConfig.DIRECT_QUEUE)
    public void receiver(String dataMsg, Channel channel, Message message) throws IOException, InterruptedException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        Thread.sleep(1000);
        log.info("接收者A deliveryTag:{} dataMsg:{} ",deliveryTag ,dataMsg);
        channel.basicAck(deliveryTag,true);
    }

    @RabbitListener(queues = DirectRabbitConfig.DIRECT_QUEUE)
    public void receiver2(String dataMsg, Channel channel, Message message) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        log.info("接收者B deliveryTag:{} dataMsg:{} ",deliveryTag ,dataMsg);
        channel.basicAck(deliveryTag,true);
    }
}

回执方法(
1、channel.basicAck表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。
2、channel.basicNack表示失败确认,一般在消费消息业务异常时用到此方法、可决定消息是否重新入列
3、channel.basicReject 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

2.3.2 广播(Fanout)类型

扇型交换机,这个交换机没有路由键概念,这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。

广播类型配置类

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * 2、广播、扇出交换机
 */
@Configuration
public class FanoutRabbitConfig {
    public final static String FANOUT_EXCHANGE = "fanoutExchange";
    public static final String FANOUT_QUEUE_A = "fanoutQueueA";
    public static final String FANOUT_QUEUE_B = "fanoutQueueB";
    public static final String FANOUT_QUEUE_C = "fanoutQueueC";
    /**
     *  创建三个队列
     *  将三个队列都绑定在交换机 fanoutExchange 上
     *  因为是扇型交换机, 路由键无需配置,配置也不起作用
     */
    @Bean
    public Queue queueA() {
        return new Queue(FANOUT_QUEUE_A);
    }
    @Bean
    public Queue queueB() {
        return new Queue(FANOUT_QUEUE_B);
    }
    @Bean
    public Queue queueC() {
        return new Queue(FANOUT_QUEUE_C);
    }
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE);
    }
    @Bean
    Binding bindingExchangeA() {
        return BindingBuilder.bind(queueA()).to(fanoutExchange());
    }
    @Bean
    Binding bindingExchangeB() {
        return BindingBuilder.bind(queueB()).to(fanoutExchange());
    }
    @Bean
    Binding bindingExchangeC() {
        return BindingBuilder.bind(queueC()).to(fanoutExchange());
    }
}

消费者

import com.chendi.springboot_rabbitmq.config.FanoutRabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

//如果开启了消息手动确认机制,一定要记得应答消息噢
//不然消息会一直堆积在mq里
@Slf4j
@Component
public class FanoutReceiver {
    @RabbitListener(queues = FanoutRabbitConfig.FANOUT_QUEUE_A)
    public void fanout_A(String message) {
        log.info("fanout_A  {}" , message);
    }

    @RabbitListener(queues = FanoutRabbitConfig.FANOUT_QUEUE_B)
    public void fanout_B(String message) {
        log.info("fanout_B  {}" , message);
    }

    @RabbitListener(queues = FanoutRabbitConfig.FANOUT_QUEUE_C)
    public void fanout_C(String message) {
        log.info("fanout_C  {}" , message);
    }
}

测试生产者 Controller加上

@GetMapping("/sendFanoutMessage")
public String sendFanoutMessage() {
    String messageData = "这是一条广播消息";
    rabbitTemplate.convertAndSend(FanoutRabbitConfig.FANOUT_EXCHANGE, "", messageData);
    return "发送完成";
}

2.3.3 主题(Topic)类型

主题交换机,特点就是在它的路由键和绑定键之间是有规则的。

「*」 (星号) 用来表示一个单词 (必须出现的)
「#」 (井号) 用来表示任意数量(零个或多个)单词
主题交换机不绑定路由键时是直连交换机,绑定「#」号时是扇形交换机。

主题模式配置类

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * 主题交换机
 * 转发规则:
 * #:匹配一个或者多个词
 * *:匹配一个或者0个词
 * 比如 有msg.# 、msg.* 匹配规则
 * msg.# 会匹配 msg.email、msg.email.b、msg.email.a
 * msg.* 只会匹配 msg.email 和 msg ,
 */
@Configuration
public class TopicRabbitConfig {
    //绑定键
    public final static String MSG_EMAIL = "msg.email";
    public final static String MSG_EMAIL_A = "msg.email.a";
    public final static String MSG_SMS = "msg.sms";
    public final static String TOPIC_EXCHANGE = "topicExchange";
    @Bean
    public Queue firstQueue() {
        return new Queue(TopicRabbitConfig.MSG_EMAIL);
    }
    @Bean
    public Queue secondQueue() {
        return new Queue(TopicRabbitConfig.MSG_EMAIL_A);
    }
    @Bean
    public Queue thirdQueue() {
        return new Queue(TopicRabbitConfig.MSG_SMS);
    }
    @Bean
    TopicExchange exchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }
    @Bean
    Binding bindingExchangeMessage() {
        return BindingBuilder.bind(firstQueue()).to(exchange()).with(MSG_EMAIL);
    }
    @Bean
    Binding bindingExchangeMessage2() {
        return BindingBuilder.bind(secondQueue()).to(exchange()).with("msg.#");
    }
    @Bean
    Binding bindingExchangeMessage3() {
        return BindingBuilder.bind(thirdQueue()).to(exchange()).with("msg.*");
    }
}

消费者

import com.chendi.springboot_rabbitmq.config.TopicRabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class TopicReceiver {
    @RabbitListener(queues = TopicRabbitConfig.MSG_EMAIL)
    public void topic_man(String message) {
        log.info("队列{} 收到消息:{}" ,TopicRabbitConfig.MSG_EMAIL, message);
    }
    @RabbitListener(queues = TopicRabbitConfig.MSG_SMS)
    public void topic_woman(String message) {
        log.info("队列{} 收到消息:{}" ,TopicRabbitConfig.MSG_SMS, message);
    }
    @RabbitListener(queues = TopicRabbitConfig.MSG_EMAIL_A)
    public void xxx(String message) {
        log.info("队列{} 收到消息:{}" ,TopicRabbitConfig.MSG_EMAIL_A, message);
    }
}

测试生产者 Controller加上

@GetMapping("/sendTopicMessage")
public String sendTopicMessage() {
    rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE, TopicRabbitConfig.MSG_EMAIL, "Hello Topic!所有队列都可以收到这条信息");
    rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE, TopicRabbitConfig.MSG_EMAIL_A, "只有 msg.email.a可以收到这条信息");
    rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE, TopicRabbitConfig.MSG_SMS, "msg.email.a 和 msg.sms可以收到这条信息");
    return "发送完成";
}

如果开启了消息手动确认机制,一定要记得应答消息噢!!!


以上整合就完成了。

三、实战应用场景

3.1 如何控制消息有序

1、当只有一个消费者可以保证消息有序,但是效率低。
2、生产者顺序发送消息到队列但是多个消费者监听一个队列时会轮询分发导致乱序。修改为一个消费者只监听一个队列,生产者自定义投放策略,1、2、3投放到A队列,4、5、6投放到B队列(顺序的消息为一个整体)投放至一个队列。

3.2 保证消息不被重复消费(幂等性)

在消费者消费结束后,正常情况下会发送回执给消息队列,证明该消息已被消费。但是此时消费者网络传输故障或者宕机了,消息队列收不到消息被消费的回执会将消息再分发给其他消费者,进而导致消息被消费多次。
·······
解决方法:(具体问题具体分析)
1、在redis中维护一个set,生产者在发送消息前,加上全局唯一的id,消费者消费之前,去redis中查一下,看是否消费过,如果没有消费过则继续执行。

//生产者
public void sendMessageIde() {
    MessageProperties properties = new MessageProperties();
    properties.setMessageId(UUID.randomUUID().toString());
    Message message = new Message("消息".getBytes(), properties);
    rabbitTemplate.convertAndSend("exchange", "", message);
}

//消费者
@RabbitListener(queues = "queue")
@RabbitHandler
public void processIde(Message message, Channel channel) throws IOException {
    if (stringRedisTemplate.opsForValue().setIfAbsent(message.getMessageProperties().getMessageId(),"1")){
        // 业务操作...
        System.out.println("消费消息:"+ new String(message.getBody(), "UTF-8"));
        // 手动确认   
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

3.3 保证消息的可靠性

消息发送流程
消息发送流程
可以看出,生产者发送的消息准确抵达消费者分为两部分
1、发送端 :消息投递到Broker成功时回调confirmCallback,交换机投递到队列失败时回调returnCallback
2、消费端的ack

配置文件

spring:
  rabbitmq:
    publisher-returns: true # 开启消息抵达队列的确认  
    # 低版本 publisher-confirms: true
    publisher-confirm-type: correlated # 开启发送端确认

配置类

/**
 * 常用的三个配置如下
 * 1---设置手动应答(acknowledge-mode: manual)
 * 2---设置生产者消息发送的确认回调机制 (  #这个配置是保证提供者确保消息推送到交换机中,不管成不成功,都会回调
 *     publisher-confirm-type: correlated
 *     #保证交换机能把消息推送到队列中
 *     publisher-returns: true
 *      template:
 *       #以下是rabbitmqTemplate配置
 *       mandatory: true)
 *  3---设置重试
 */
@Slf4j
@Configuration
public class RabbitConfig {
    @Autowired
    private ConnectionFactory rabbitConnectionFactory;

    // 存在此名字的bean 自带的容器工厂会不加载(yml下rabbitmq下的template的配置),如果想自定义来区分开 需要改变bean 的名称
    @Bean
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate rabbitTemplate=new RabbitTemplate(rabbitConnectionFactory);
        //默认是用jdk序列化
        //数据转换为json存入消息队列,方便可视化界面查看消息数据
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);
        //此处设置重试template后,会再生产者发送消息的时候,调用该template中的调用链
        rabbitTemplate.setRetryTemplate(rabbitRetryTemplate());
        rabbitTemplate.setConfirmCallback(
                (correlationData, ack, cause) -> {
                    if(!ack){
                        System.out.println("ConfirmCallback     "+"相关数据:"+  correlationData);
                        System.out.println("ConfirmCallback     "+"确认情况:"+ ack);
                        System.out.println("ConfirmCallback     "+"原因:"+ cause);
                    }
                });
        rabbitTemplate.setReturnsCallback((ReturnedMessage returned) -> {
            System.out.println("ReturnsCallback:     "+"消息:"+ returned.getMessage());
            System.out.println("ReturnsCallback:     "+"回应码:"+ returned.getReplyCode());
            System.out.println("ReturnsCallback:     "+"回应消息:"+ returned.getReplyText());
            System.out.println("ReturnsCallback:     "+"交换机:"+ returned.getExchange());
            System.out.println("ReturnsCallback:     "+"路由键:"+ returned.getRoutingKey());
        });
        return rabbitTemplate;
    }

    //重试的Template
    @Bean
    public RetryTemplate rabbitRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        // 设置监听  调用重试处理过程
        retryTemplate.registerListener(new RetryListener() {
            @Override
            public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {
                // 执行之前调用 (返回false时会终止执行)
                //log.info("执行之前调用 (返回false时会终止执行)");
                return true;
            }
            @Override
            public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
                // 方法结束的时候调用
                if(retryContext.getRetryCount() > 0){
                    log.info("最后一次调用");
                }
            }
            @Override
            public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
                // 方法异常时会调用
                log.info("第{}次调用", retryContext.getRetryCount());
            }
        });
        return retryTemplate;
    }
}

发送端测试

import com.chendi.springboot_rabbitmq.config.DirectRabbitConfig;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;

@RestController
public class SendCallbackMessageController {

    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法

    @ResponseBody
    @GetMapping("/sendMessageToExchangeFail")
    public Object sendMessageToExchangeFail() {
        String messageData = "这条消息不会到达交换机";
        rabbitTemplate.convertAndSend("不存在的交换机", "", messageData, new CorrelationData(UUID.randomUUID().toString()));
        return messageData;
    }
    @ResponseBody
    @GetMapping("/sendMessageToQueueFail")
    public Object sendMessageToQueueFail() {
        String messageData = "这条消息不会到达队列";
        rabbitTemplate.convertAndSend(DirectRabbitConfig.DIRECT_EXCHANGE, "不存在的路由键", messageData, new CorrelationData(UUID.randomUUID().toString()));
        return messageData;
    }
}

请求结果:
在这里插入图片描述

3.4 死信队列解决订单超时未支付

场景:当顾客购买一件商品存在的操作
生成订单 =》 扣减库存 =》 完成支付
当库存只剩1件时,A用户下单但是迟迟未支付,会导致B用户下单时,判断库存不足导致生成订单失败。
此时,就需要解决订单超时未支付的问题。

流程 :
初始化两组正常队列和交换机A、B,A组的初始化参数x-dead-letter-exchange、x-dead-letter-routing-key指向B组的交换机和路由键。意在,A中删除或过期的数据,可以放入指定交换机指定路由键的队列中。
-这样如果设置了订单超过5min未支付
发送方在发送消息时,指定过期时间为5 * 60 * 1000
时间过期后此消息会投递到队列B(死信队列)中,队列B根据订单id去判断是否支付,去做加库存等相应的操作。

死信队列配置类

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 解决订单超时未支付的问题
 *
 * 创建两个队列
 * 1、队列A(正常的队列只是设置了某些参数):设置队列中的超时未消费信息指定丢到对应的队列B
 * 2、队列B(也是一个正常的队列),只是把超时的信息丢给它所以称呼为死信队列
 */

@Configuration
public class DeadLetterExchangeConfig {

    /**
     * x-message-tti(Time-To-Live)发送到队列的消息在丟弃之前可以存活多长时间(毫秒)
     * x-max-length限制队列最大长度(新增后挤出最早的),单位个数
     * x-expires队列没有访问超时时,自动删除(包含没有消费的消息),单位毫秒
     * x-max-length-bytes限制队列最大容量
     * x-dead-letter-exchange死信交换机,将删除/过期的数据,放入指定交换机
     * x-dead-letter-routing-key死信路由,将删除/过期的数据,放入指定routingKey
     * x-max-priority队列优先级
     * x-queue-mode对列模式,默认lazy(将数据放入磁盘,消费时放入内存)
     * x-queue-master-locator镜像队列
     */
    @Bean
    public Queue orderQueue(){
        Map<String, Object> args = new HashMap<>(2);
        // 绑定我们的死信交换机
        args.put("x-dead-letter-exchange", "orderDeadExChange");
        // 绑定我们的路由key
        args.put("x-dead-letter-routing-key", "orderDeadRoutingKey");
        return new Queue("orderQueue", true, false, false, args);
    }
    @Bean
    public Queue orderDeadQueue(){
        return new Queue("orderDeadQueue");
    }
    @Bean
    public DirectExchange orderExchange(){
        return new DirectExchange("orderExchange");
    }
    @Bean
    public DirectExchange orderDeadExchange(){
        return new DirectExchange("orderDeadExChange");
    }
    //绑定正常队列到交换机
    @Bean
    public Binding orderBindingExchange(Queue orderQueue, DirectExchange orderExchange) {
        return BindingBuilder.bind(orderQueue).to(orderExchange).with("orderRoutingKey");
    }
    //绑定死信队列到死信交换机
    @Bean
    public Binding deadBindingExchange(Queue orderDeadQueue,  DirectExchange orderDeadExchange) {
        return BindingBuilder.bind(orderDeadQueue).to(orderDeadExchange).with("orderDeadRoutingKey");
    }
}

消费者

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
 * 死信队列的消费者
 */
@Slf4j
@Component
public class DeadLetterReceiver {

    @RabbitListener(queues = "orderDeadQueue")
    public void orderDeadQueueReceiver(String dataMsg, Channel channel, Message message) {
        try{
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            log.info("死信队列接收者A收到消息,根据订单id查询订单是否支付,未支付解锁库存 deliveryTag:{} dataMsg:{} ",deliveryTag ,dataMsg);
            channel.basicAck(deliveryTag,false);
        } catch (Exception e){
            log.info("如果报错了,执行补偿机制");
        }
    }
}

生产者

@GetMapping("/createOrder")
public String createOrder() {
    rabbitTemplate.convertAndSend("orderExchange", "orderRoutingKey", "我是订单json", message -> {
        //设置过期时间10s
        message.getMessageProperties().setExpiration("10000");
        return message;
    });
    return "发送完成";
}

总结

MQ的应用场景:

  1. 异步处理(注册发邮件发短消息)
  2. 应用解耦(用户下单后,订单系统需要通知库存系统扣减库存,就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失.)
  3. 流量削峰(秒杀活动,一般会因为流量过大,导致应用挂掉,设置消息队列参数,如果长度超过最大值,则直接抛弃用户请求或跳转到错误页面)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/380704.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【保姆级】Java后端查询数据库结果导出xlsx文件+打印xlsx表格

目录前言一、需求一&#xff1a;数据库查询的数据导出成Excel表格1.1 Vue前端实现导出按钮点击事件1.2 后端根据数据库查询结果生成xlsx文件二、需求二&#xff1a;对生成的xlsx文件调用打印机打印2.1 Vue前端实现按钮事件2.2 后端实现打印前言 最近在弄一个需求&#xff0c;需…

低代码如何推动自动化未来

一项全球研究表明&#xff0c;企业平均每个月有60个小时的工作是手动完成的&#xff0c;也就是每个员工每天花3个小时完成文件归档、数据输入和报告整合&#xff0c;而这些工作都是可以通过自动化的方式完成的。 组织实现数字化转型的关键环节就是自动化。通过自动化&#xff…

温控负荷的需求响应潜力评估及其协同优化管理研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

登录Oracle数据库遇到ORA-01017密码错误的解决办法

文章目录症状分析解决办法欢迎加下方我的微信&#x1f447;&#xff0c;拉你入学习群我们在登录Oracle数据库时可能会遇到ORA-01017错误&#xff0c;这里分析原因并提供解决办法。点击试看博主的专著《MySQL 8.0运维与优化》&#xff08;清华大学出版社&#xff09; 症状 图像…

Linux rpm安装mysql

个人记录 第一步&#xff1a;卸载已安装的mysql rpm -qa | grep -i mysql 查询已安装的mysql1、确认停止mysql服务 2、删除卸载mysql –nodeps&#xff1a;表示强制卸载&#xff0c;如果因为依赖关系导致卸载不成功&#xff0c;加上强制卸载选项–nodeps rpm -ev mysql-com…

C盘爆满?两个超简单的解决办法

我们在使用电脑的过程中&#xff0c;经常容易出现C盘爆红&#xff0c;反而其他盘还有大量可用空间的情况。为什么会这样呢&#xff1f;其实主要就两种原因&#xff1a;一是电脑使用习惯不好&#xff0c;不管什么软件都默认安装在C盘&#xff0c;大文件又喜欢放在桌面&#xff0…

Python(青铜时代)——列表

列表 在Python中&#xff0c;所有 非数字型变量 都支持以下特点&#xff1a; 都是一个序列 sequence, 也可以理解为 容器 取值 [] 遍历 for in 计算长度、最大/最小值、比较、删除 链接 和 重复 * 切片 列表的定义 List (列表) 是Python 中使用 最频繁 的数据类型&#…

6.3 负反馈放大电路的方块图及一般表达式

一、负反馈放大电路的方块图表示法 任何负反馈放大电路都可以用图6.3.1所示的方块图来表示&#xff0c;上面一个方块是负反馈放大电路的基本放大电路&#xff0c;下面一个方块是反馈放大电路的反馈网络。负反馈放大电路的基本放大电路是在断开反馈且考虑了反馈网络的负载效应的…

让您的客户了解您的制造过程“VR云看厂实时数字化展示”

一、工厂云考察&#xff0c;成为市场热点虚拟现实&#xff08;VR&#xff09;全景技术问世已久&#xff0c;但由于应用范围较为狭窄&#xff0c;一直未得到广泛应用。国外客户无法亲自到访&#xff0c;从而导致考察难、产品取样难等问题&#xff0c;特别是对于大型制造企业来说…

剑指 Offer 14-剪绳子

摘要 ​​​​​​剑指 Offer 14- I. 剪绳子 剑指 Offer 14- II. 剪绳子 II 343. 整数拆分 一、动态规划解析 这道题给定一个大于1的正整数n&#xff0c;要求将n 拆分成至少两个正整数的和&#xff0c;并使这些正整数的乘积最大化&#xff0c;返回最大乘积。令x是拆分出的第…

Spark Transformation转换算子和Action行动算子

1、Transformation转换算子 RDD整体上分为Value类型、双Value类型和Key-Value类型 1.1&#xff0c;Value类型 1.1.1&#xff0c;map()映射 object value01_map {def main(args: Array[String]): Unit {//1.创建SparkConf并设置App名称val conf new SparkConf().setAppName(…

c语言入门-5-字符串

c语言入门-5-字符串正文1、字符串怎么用方式一方式二2、字符串的长度深度解析1 字符串的特性2 \0 的含义3 ascii码表下一篇正文 1、字符串怎么用 方式一 // 字符串的标准使用方式&#xff0c;用char类型的数组表示字符串 #include<stdio.h> int main() {char arr[] &…

语音识别技术对比分析

文章目录一、语音识别产品对比二、百度语音识别产品1、套餐及价格&#xff1a;2、官网3、调研结果三、华为语音识别产品四、阿里云语音识别产品1、套餐及价格&#xff1a;2、官网地址3、调研结果五、科大讯飞语音识别产品1、套餐及价格&#xff1a;2、官网3、调研结果六、有道语…

一、Redis入门概述(是什么,能干嘛,去哪下,怎么玩)

一. redis是什么&#xff1f; Redis:REmote Dictionary Server(远程字典服务器)官方解释&#xff1a; Remote Dictionary Server(远程字典服务)是完全开源的&#xff0c;使用ANSIC语言编写遵守BSD协议&#xff0c;是一个高性能的Key-Value数据库提供了丰富的数据结构&#xff…

何谓dB , dB怎么理解?

dB 是什么单位 ?愈低愈好吗?对于声频 ( 声学及电子声学 ) 方面的单位&#xff0c;它是以分贝(decibel &#xff0c;dB ) 来做结果的。斯多里一生专注于科学,1876 发明电话&#xff0c;我们都知道贝尔发明了电话&#xff0c;然而重要的是&#xff0c;他发现我们人类耳朵对声音…

一文带你了解什么是PACS系统源码

▷ 运维级带三维重建和还原的医院PACS系统有源码&#xff0c;有演示&#xff0c;带使用手册和操作说明书。 ▷ PACS系统及影像存取与传输系统( Picture Archiving and Communication System)&#xff0c;为以实现医学影像数字化存储、诊断为核心任务&#xff0c;从医学影像设备…

uniapp小程序接入腾讯地图sdk

新建一个项目。配置uniapp配置文件设置小程序的appid注意&#xff1a;匿名用户可能存在地理定位失效。查uniapp官网官网->apiuni.getLocation(OBJECT) 获取当前的地理位置、速度。属性&#xff1a;success匿名函数返回值&#xff1a;uni.getLocation({type: gcj02,success: …

工作实战之密码防重放攻击

目录 前言 一、登录认证密码加密 二、bcrypt加密密码不一样&#xff0c;匹配原理 1.程序运行现象 2.原理解释 三、密码防重放 总结 前言 密码重放攻击&#xff1a;请求被攻击者获取&#xff0c;并重新发送给认证服务器&#xff0c;从而达到认证通过的目的 一、登录认证密…

系列八、SQL优化

一、插入数据 如果我们需要一次性往数据库表中插入多条记录&#xff0c;可以从以下三个方面进行优化。1.1、优化方案一&#xff08;批量插入数据&#xff09; Insert into tb_test values(1,Tom),(2,Cat),(3,Jerry); 1.2、优化方案二&#xff08;手动控制事务&#xff09; s…

CEC2005:星雀优化算法(Nutcracker optimizer algorithm,NOA)求解CEC2005(提供MATLAB代码)

一、星雀优化算法NOA 星雀优化算法(Nutcracker optimizer algorithm,NOA)由Mohamed Abdel-Basset等人于2023年提出&#xff0c;该算法模拟星雀的两种行为&#xff0c;即&#xff1a;在夏秋季节收集并储存食物&#xff0c;在春冬季节搜索食物的存储位置。 星鸦单独或成对活动&…