RabbitMQ进阶篇

news2025/1/9 17:09:23

文章目录

  • 发送者的可靠性
    • 生产者重试机制
    • 实现生产者确认
  • MQ的可靠性
    • 数据持久化
      • 交换机持久化
      • 队列持久化
      • 消息持久化
    • Lazy Queue(可配置~)
      • 控制台配置Lazy模式
      • 代码配置Lazy模式
      • 更新已有队列为lazy模式
  • 消费者的可靠性
    • 消费者确认机制
    • 失败重试机制
    • 失败处理策略
  • 业务幂等性
    • 唯一消息ID
    • 业务判断
    • 兜底方案
  • 延迟消息
    • 死信交换机(不推荐使用)
    • 延迟消息
    • DelayExchange插件(推荐)
      • 插件下载地址:
      • 安装:
      • 声明延迟交换机
      • 发送延迟消息
    • 超时订单问题
      • 定义常量
      • 抽取共享mq配置
      • 改造下单业务
      • 编写查询支付状态接口
      • 消息监听

发送者的可靠性

保证一个消息发送出去,至少被消费一次。
image.png
可能在多个步骤中给消息弄丢了

生产者重试机制

不建议使用, 会增加网络和资源的消耗

第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断
当RabbitTemplate与MQ连接超时后,多次重试
修改publisher模块的application.yaml文件,添加下面的内容:

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

实现生产者确认

image.png

  • ConfirmCallback为发送Exchange(交换器)时回调,成功或者失败都会触发;
  • ReturnCallback为路由不到队列时触发,成功则不触发;

Rabbitmq之ConfirmCallback与ReturnCallback使用_rabbitmq returncallback-CSDN博客
在publisher模块的application.yaml中添加配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执

定义ReturnCallback:
image.png

package com.itheima.publisher.config;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
    private final RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.error("触发return callback,");
                log.debug("exchange: {}", returned.getExchange());
                log.debug("routingKey: {}", returned.getRoutingKey());
                log.debug("message: {}", returned.getMessage());
                log.debug("replyCode: {}", returned.getReplyCode());
                log.debug("replyText: {}", returned.getReplyText());
            }
        });
    }
}

image.png
image.png
定义ConfirmCallback:
由于每个消息发送时的处理逻辑不一定相同,因此**ConfirmCallback需要在每次发消息时定义。**具体来说,是在调用RabbitTemplate中的convertAndSend方法 时,多传递一个参数:

image.png

这里的CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
  • SettableListenableFuture:回执结果的Future对象

将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:

image.png

我们新建一个测试,向系统自带的交换机发送消息,并且添加ConfirmCallback:

@Test
void testPublisherConfirm() {
    // 1.创建CorrelationData,需要一个UUID,回调的时候通过id识别
    CorrelationData cd = new CorrelationData();
    // 2.给Future添加ConfirmCallback
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
        @Override
        public void onFailure(Throwable ex) {
            // 2.1.Future发生异常时的处理逻辑,基本不会触发
            log.error("send message fail", ex);
        }
        @Override
        public void onSuccess(CorrelationData.Confirm result) {
            // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
            if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
                log.debug("发送消息成功,收到 ack!");
            }else{ // result.getReason(),String类型,返回nack时的异常描述
                log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
            }
        }
    });
    // 3.发送消息
    rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}

image.png

开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:

  • 路由失败:一般是因为RoutingKey错误导致,往往是编程导致
  • 交换机名称错误:同样是编程错误导致
  • MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了

image.png

MQ的可靠性

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化
image.png

  • 交换机持久化
  • 队列持久化
  • 消息持久化

数据持久化

交换机持久化

image.png

设置为Durable就是持久化模式,Transient就是临时模式。

队列持久化

image.png

消息持久化

image.png
image.png

说明:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。
不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

当内存占满, page out会影响MQ阻塞

Lazy Queue(可配置~)

image.png

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
  • 支持数百万条的消息存储

而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。官方推荐所有队列都为LazyQueue模式。

控制台配置Lazy模式

image.png

代码配置Lazy模式

在利用SpringAMQP声明队列的时候,添加x-queue-mod=lazy参数也可设置队列为Lazy模式:

@Bean
public Queue lazyQueue(){
    return QueueBuilder
    .durable("lazy.queue")
    .lazy() // 开启Lazy模式
    .build();
}

当然,我们也可以基于注解来声明队列并设置为Lazy模式:

@RabbitListener(queuesToDeclare = @Queue(
    name = "lazy.queue",
    durable = "true",					
    arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到 lazy.queue的消息:{}", msg);
}

更新已有队列为lazy模式

可以基于命令行设置policy:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues

当然,也可以在控制台配置policy,进入在控制台的Admin页面,点击Policies,即可添加配置:
image.png

消费者的可靠性

  • 消息投递的过程中出现了网络故障
  • 消费者接收到消息后突然宕机
  • 消费者接收到消息后,因处理不当导致异常

消费者确认机制

当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用

  • manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活

  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回

    • 如果是业务异常,会自动返回nack;
    • 如果是消息处理或校验异常,自动返回reject;

通过下面的配置可以修改SpringAMQP的ACK处理方式:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 不做处理

失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。

当然,上述极端情况发生的概率还是非常低的,不过不怕一万就怕万一。为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
  • 本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃

失败处理策略

Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐)

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

在consumer服务中定义处理失败消息的交换机和队列
定义一个RepublishMessageRecoverer,关联队列和交换机
image.png

/**
 * 错误消息配置类,用于配置 RabbitMQ 错误消息处理相关的 Bean。
 * 当前类会根据 spring.rabbitmq.listener.simple.retry.enabled 属性的值来决定是否创建相关的 Bean。
 * 如果该属性值为 true,则会创建错误消息交换机、错误队列和绑定关系,并配置消息恢复器。
 */
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {

    /**
     * 创建错误消息交换机
     */
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }

    /**
     * 创建错误队列
     */
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }

    /**
     * 创建错误队列与错误消息交换机的绑定关系,"error"是路由键
     */
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    /**
     * 创建消息恢复器
     */
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

消费者如何保证消息一定被消费?

  • 开启消费者确认机制为auto, 由spring确认消息处理成功后返回ack, 异常时返回nack
  • 开启消费者失败重试机制, 并设置MessageRecoverer, 多次重试失败后将信息投递到异常交换机

业务幂等性

image.png
保证消息处理的幂等性。这里给出两种方案:

  • 唯一消息ID
  • 业务状态判断

唯一消息ID

  • 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  • 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  • 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可
以Jackson的消息转换器为例( 加在启动类 ):

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);// 在底层会自动创建一个UUID
    return jjmc;
}

在Spring AMQP中,当使用Jackson2JsonMessageConverter并开启setCreateMessageIds(true)功能时,底层会自动在消息的属性中添加一个名为amqp_messageId的字段,其值为自动生成的UUID。
具体来说,UUID会成为消息的一部分,保存在消息的AMQP(Advanced Message Queuing Protocol)属性中。这些属性是与消息一起传递的元数据,包含了关于消息的一些信息。amqp_messageId字段就是用于唯一标识消息的UUID。
当消息被发送给消费者时,消费者可以通过**message.getMessageProperties().getMessageId()**方法来获取消息的ID,然后根据业务需求将该ID保存到数据库中。在处理相同消息时,消费者可以在数据库中查询是否存在相同的消息ID,以判断是否为重复消息。

业务判断

image.png
image.png
相比较而言,消息ID的方案需要改造原有的数据库,所以我更推荐使用业务判断的方案。

@Override
public void markOrderPaySuccess(Long orderId) {
// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
lambdaUpdate()
.set(Order::getStatus, 2)
.set(Order::getPayTime, LocalDateTime.now())
.eq(Order::getId, orderId)
.eq(Order::getStatus, 1)
.update();
}

上述代码逻辑上符合了幂等判断的需求,但是由于判断和更新是两步动作,因此在极小概率下可能存在线程安全问题。

@Override
public void markOrderPaySuccess(Long orderId) {
// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
lambdaUpdate()
.set(Order::getStatus, 2)
.set(Order::getPayTime, LocalDateTime.now())
.eq(Order::getId, orderId)
.eq(Order::getStatus, 1)
.update();
}

image.png

兜底方案

既然MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致。

image.png

图中黄色线圈起来的部分就是MQ通知失败后的兜底处理方案,由交易服务自己主动去查询支付状态。

综上,支付服务与交易服务之间的订单状态一致性是如何保证的?

  • 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
  • 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性
  • 最后,我们还在交易服务设置了定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

延迟消息

image.png

死信交换机(不推荐使用)

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

死信交换机有什么作用呢?

  • 收集那些因处理失败而被拒绝的消息
  • 收集那些因队列满了而被拒绝的消息
  • 收集因TTL(有效期)到期的消息

延迟消息

RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。

当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的TTL时间不一定准确。

DelayExchange插件(推荐)

插件下载地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

安装:

因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。

docker volume inspect mq-plugins

结果如下:
image.png

[
  {
    "CreatedAt": "2024-01-19T09:22:59+08:00",
    "Driver": "local",
    "Labels": null,
    "Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
    "Name": "mq-plugins",
    "Options": null,
    "Scope": "local"
  }
]

插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。

接下来执行命令,启用插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

声明延迟交换机

基于注解方式:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "delay.queue", durable = "true"),
    exchange = @Exchange(name = "delay.direct", delayed = "true"),
    key = "delay"	
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}

基于@Bean的方式:

package com.itheima.consumer.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class DelayExchangeConfig {

    @Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder
        .directExchange("delay.direct") // 指定交换机类型和名称
        .delayed() // 设置delay的属性为true
        .durable(true) // 持久化
        .build();
    }

    @Bean
    public Queue delayedQueue(){
        return new Queue("delay.queue");
    }

    @Bean
    public Binding delayQueueBinding(){
        return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
    }
}

发送延迟消息

发送消息时,必须通过x-delay属性设定延迟时间:

@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}

可以写一个延迟时间的类, 不用每次都new一个,具体代码如下:
image.png

/**
 * @author Ccoo
 * 2024/1/22
 */
@RequiredArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {

	private final int delay;

	@Override
	public Message postProcessMessage(Message message) throws AmqpException {
		message.getMessageProperties().setDelay(delay);
		return message;
	}
}

最后上面的业务代码变为:

@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message,
                                 new DelayMessageProcessor(message.removeNextDelay().intValue());
} 

延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。因此,不建议设置延迟时间过长的延迟消息

超时订单问题

image.png

由于我们要多次发送延迟消息,因此需要先定义一个记录消息延迟时间的消息体,处于通用性考虑,我们将其定义到hm-common模块下:

image.png

@Data
public class MultiDelayMessage<T> {
	/**
	 * 消息体
	 */
	private T data;
	/**
	 * 记录延迟时间的集合
	 */
	private List<Long> delayMillis;

	public MultiDelayMessage(T data, List<Long> delayMillis) {
		this.data = data;
		this.delayMillis = delayMillis;
	}
	public static <T> MultiDelayMessage<T> of(T data, Long ... delayMillis){
		return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));
	}

	/**
	 * 获取并移除下一个延迟时间
	 * @return 队列中的第一个延迟时间
	 */
	public Long removeNextDelay(){
		return delayMillis.remove(0);
	}

	/**
	 * 是否还有下一个延迟时间
	 */
	public boolean hasNextDelay(){
		return !delayMillis.isEmpty();
	}
}

定义常量

image.png

/**
 * @author Ccoo
 * 2024/1/22
 */
public interface MqConstants {
	String DELAY_EXCHANGE = "trade.delay.topic";
	String DELAY_ORDER_QUEUE = "trade.order.delay.queue";
	String DELAY_ORDER_ROUTING_KEY = "order.query";
}

抽取共享mq配置

在nacos中定义一个名为shared-mq.xml的配置文件,内容如下:

spring: 
  rabbitmq:
    host: ${hm.mq.host:192.168.164.128} # 主机名
    port: ${hm.mq.port:5672} # 端口
    virtual-host: ${hm.mq.vhost:/hmall} # 虚拟主机
    username: ${hm.mq.un:itheima} # 用户名
    password: ${hm.mq.pw:123321} # 密码
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

在trade-service模块添加共享配置:
image.png

改造下单业务

  1. 引入依赖

在trade-service模块的pom.xml中引入amqp的依赖:

<!--amqp-->
  <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>
  1. 改造下单业务

image.png

编写查询支付状态接口

首先,在hm-api模块定义三个类:

image.png

说明:

  • PayOrderDTO:支付单的数据传输实体
  • PayClient:支付系统的Feign客户端
  • PayClientFallback:支付系统的fallback逻辑
@Data
@ApiModel(description = "支付单数据传输实体")
public class PayOrderDTO {
	@ApiModelProperty("id")
	private Long id;
	@ApiModelProperty("业务订单号")
	private Long bizOrderNo;
	@ApiModelProperty("支付单号")
	private Long payOrderNo;
	@ApiModelProperty("支付用户id")
	private Long bizUserId;
	@ApiModelProperty("支付渠道编码")
	private String payChannelCode;
	@ApiModelProperty("支付金额,单位分")
	private Integer amount;
	@ApiModelProperty("付类型,1:h5,2:小程序,3:公众号,4:扫码,5:余额支付")
	private Integer payType;
	@ApiModelProperty("付状态,0:待提交,1:待支付,2:支付超时或取消,3:支付成功")
	private Integer status;
	@ApiModelProperty("拓展字段,用于传递不同渠道单独处理的字段")
	private String expandJson;
	@ApiModelProperty("第三方返回业务码")
	private String resultCode;
	@ApiModelProperty("第三方返回提示信息")
	private String resultMsg;
	@ApiModelProperty("支付成功时间")
	private LocalDateTime paySuccessTime;
	@ApiModelProperty("支付超时时间")
	private LocalDateTime payOverTime;
	@ApiModelProperty("支付二维码链接")
	private String qrCodeUrl;
	@ApiModelProperty("创建时间")
	private LocalDateTime createTime;
	@ApiModelProperty("更新时间")
	private LocalDateTime updateTime;
}
@FeignClient(value = "pay-service", fallbackFactory = PayClientFallback.class)
public interface PayClient {
	/**
	 * 根据交易订单id查询支付单
	 * @param id 业务订单id
	 * @return 支付单信息
	 */
	@GetMapping("/pay-orders/biz/{id}")
	PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id);
}
@Slf4j
public class PayClientFallback implements FallbackFactory<PayClient> {
	@Override
	public PayClient create(Throwable cause) {
		return new PayClient() {
			@Override
			public PayOrderDTO queryPayOrderByBizOrderNo(Long id) {
				return null;
			}
		};
	}
}

最后,在pay-service模块的PayController中实现该接口:

@ApiOperation("根据id查询支付单")
@GetMapping("/biz/{id}")
public PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id){
    PayOrder payOrder = payOrderService.lambdaQuery().eq(PayOrder::getBizOrderNo, id).one();
    return BeanUtils.copyBean(payOrder, PayOrderDTO.class);
}

消息监听

接下来,我们在trader-service编写一个监听器,监听延迟消息,查询订单支付状态:
image.png

@Slf4j
@Component
@RequiredArgsConstructor
public class OrderStatusListener {

    private final IOrderService orderService;

    private final PayClient payClient;

    private final RabbitTemplate rabbitTemplate;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = MqConstants.DELAY_ORDER_QUEUE, durable = "true"),
            exchange = @Exchange(name = MqConstants.DELAY_EXCHANGE, type = ExchangeTypes.TOPIC),
            key = MqConstants.DELAY_ORDER_ROUTING_KEY
    ))
    public void listenOrderCheckDelayMessage(MultiDelayMessage<Long> msg) {
        // 1.获取消息中的订单id
        Long orderId = msg.getData();
        // 2.查询订单,判断状态:1是未支付,大于1则是已支付或已关闭
        Order order = orderService.getById(orderId);
        if (order == null || order.getStatus() > 1) {
            // 订单不存在或交易已经结束,放弃处理
            return;
        }
        // 3.可能是未支付,查询支付服务
        PayOrderDTO payOrder = payClient.queryPayOrderByBizOrderNo(orderId);
        if (payOrder != null && payOrder.getStatus() == 3) {
            // 支付成功,更新订单状态
            orderService.markOrderPaySuccess(orderId);
            return;
        }
        // 4.确定未支付,判断是否还有剩余延迟时间
        if (msg.hasNextDelay()) {
            // 4.1.有延迟时间,需要重发延迟消息,先获取延迟时间的int值
            int delayVal = msg.removeNextDelay().intValue();
            // 4.2.发送延迟消息
            rabbitTemplate.convertAndSend(MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg,
                    message -> {
                        message.getMessageProperties().setDelay(delayVal);
                        return message;
                    });
            return;
        }
        // 5.没有剩余延迟时间了,说明订单超时未支付,需要取消订单
        orderService.cancelOrder(orderId);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1887110.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

编译libvlccpp

首先下载vlc sdk https://get.videolan.org/vlc/3.0.9.2/win64/vlc-3.0.9.2-win64.7z Cmake 生成libvlccpp vs2022工程文件 编译libvlccpp 编译出错需修改代码 错误信息&#xff1a; \VLC\sdk\include\vlc/libvlc_media.h(368): error C2065: “libvlc_media_read_cb”: 未…

Linux高并发服务器开发(九)Tcp状态转移和IO多路复用

文章目录 0 包裹函数1 多进程服务器流程代码 2 多线程服务器3 TCP状态转移半关闭心跳包 4 端口复用5 IO多路复用技术高并发服务器 6 select代码总结 7 POLLAPI代码poll相对select的优缺点 8 epoll&#xff08;重点&#xff09;API监听管道代码EPOLL 高并发服务器 9 Epoll的两种…

【MySQL备份】Percona XtraBackup加密备份实战篇

目录 1.前言 2.准备工作 2.1.环境信息 2.2.配置/etc/my.cnf文件 2.3.授予root用户BACKUP_ADMIN权限 2.4.生成加密密钥 2.5.配置加密密钥文件 3.加密备份 4.优化加密过程 5.解密加密备份 6.准备加密备份 7.恢复加密备份 7.1.使用rsync进行恢复 7.2.使用xtrabackup命令恢…

go Channel原理 (四)

Channel 设计原理 不要通过共享内存的方式进行通信&#xff0c;而是应该通过通信的方式共享内存。 在主流编程语言中&#xff0c;多个线程传递数据的方式一般都是共享内存。 Go 可以使用共享内存加互斥锁进行通信&#xff0c;同时也提供了一种不同的并发模型&#xff0c;即通…

终极指南:RNNS、Transformers 和 Diffusion 模型

一、说明 作为广泛使用这些工具和模型的人&#xff0c;我的目标是解开 RNN、Transformer 和 Diffusion 模型的复杂性和细微差别&#xff0c;为您提供详细的比较&#xff0c;为您的特定需求提供正确的选择。 无论您是在构建语言翻译系统、生成高保真图像&#xff0c;还是处理时间…

【ACM出版,马来西亚-吉隆坡举行】第四届互联网技术与教育信息化国际会议 (ITEI 2024)

作为全球科技创新大趋势的引领者&#xff0c;中国不断营造更加开放的科技创新环境&#xff0c;不断提升学术合作的深度和广度&#xff0c;构建惠及各方的创新共同体。这是对全球化的新贡献&#xff0c;是构建人类命运共同体的新贡献。 第四届互联网技术与教育信息化国际学术会议…

【CSAPP】-binarybomb实验

目录 实验目的与要求 实验原理与内容 实验设备与软件环境 实验过程与结果&#xff08;可贴图&#xff09; 操作异常问题与解决方案 实验总结 实验目的与要求 1. 增强学生对于程序的机器级表示、汇编语言、调试器和逆向工程等方面原理与技能的掌握。 2. 掌握使用gdb调试器…

【ONLYOFFICE】| 桌面编辑器从0-1使用初体验

目录 一. &#x1f981; 写在前面二. &#x1f981; 在线使用感受2.1 创建 ONLYOFFICE 账号2.2 编辑pdf文档2.3 pdf直接创建表格 三. &#x1f981; 写在最后 一. &#x1f981; 写在前面 所谓桌面编辑器就是一种用于编辑文本、图像、视频等多种自媒体的软件工具&#xff0c;具…

.NET周刊【6月第5期 2024-06-30】

国内文章 呼吁改正《上海市卫生健康信息技术应用创新白皮书》 C# 被认定为A 组件 的 错误认知 https://www.cnblogs.com/shanyou/p/18264292 近日&#xff0c;《上海市卫生健康“信息技术应用创新”白皮书》发布&#xff0c;提到医疗信创核心应用适配方法及公立医院信息系统…

thinksboard新建table表格

html文件 <div fxFlex fxLayoutAlign"left top" style"display: block"> <!-- <mat-card appearance"raised" style"max-height: 80vh; overflow-y: auto;">--><div><button mat-raised-button (click)&…

linux和mysql基础指令

Linux中nano和vim读可以打开记事文件。 ifdown ens33 ifup ens33 关闭&#xff0c;开启网络 rm -r lesson1 gcc -o code1 code1.c 编译c语言代码 ./code1 执行c语言代码 rm -r dir 删除文件夹 mysql> show databases-> ^C mysql> show databases; -------…

基于STM32的卫星GPS路径记录仪

目录 引言环境准备卫星GPS路径记录仪基础代码实现&#xff1a;实现卫星GPS路径记录仪 4.1 数据采集模块4.2 数据处理与分析4.3 存储系统实现4.4 用户界面与数据可视化应用场景&#xff1a;路径记录与分析问题解决方案与优化收尾与总结 1. 引言 卫星GPS路径记录仪通过使用STM…

第十四届蓝桥杯省赛C++B组A题【日期统计】题解(AC)

题目大意 给定的字符串中&#xff0c;有几个子序列是 2023 2023 2023 年的日期&#xff08;每个日期只能算一次&#xff09;。 法一 枚举所有长度为 8 8 8 的子序列&#xff0c;判断是否是有效日期并去重&#xff0c;时间复杂度 O ( C 100 8 ) O(C^8_{100}) O(C1008​)。…

秋招Java后端开发冲刺——并发篇1(线程与进程、多线程)

一、进程 1. 进程 进程是程序的一次动态执行过程&#xff0c;是操作系统资源分配的基本单位。 2. 进程和线程的区别 特性进程线程定义独立运行的程序实例&#xff0c;资源分配的基本单位进程中的一个执行单元&#xff0c;CPU调度的基本单位资源进程拥有独立的内存空间和资源线…

Java_多线程:线程池

1、线程池优点&#xff1a; 降低资源消耗&#xff1a;通过重复利用已创建的线程降低线程创建和销毁造成的消耗。提高响应速度&#xff1a;当任务到达时&#xff0c;任务可以不需要等到线程创建就能立即执行。提高线程的可管理性&#xff1a;线程是稀缺资源&#xff0c;如果无限…

c语言的烫烫烫烫烫??

当初学习C语言时&#xff0c;对于一些特殊的打印输出可能会感到困惑&#xff0c;比如会出现一堆乱码烫烫烫的情况。其实这是因为在C语言中&#xff0c;对于字符类型和数字类型之间的隐式转换可能会导致打印输出的结果不符合预期。这并不意味着程序员"烫"&#xff0c;…

鸿蒙开发Ability Kit(程序访问控制):【安全控件概述】

安全控件概述 安全控件是系统提供的一组系统实现的ArkUI组件&#xff0c;应用集成这类组件就可以实现在用户点击后自动授权&#xff0c;而无需弹窗授权。它们可以作为一种“特殊的按钮”融入应用页面&#xff0c;实现用户点击即许可的设计思路。 相较于动态申请权限的方式&am…

Dify入门指南

一.Dify介绍 生成式 AI 应用创新引擎&#xff0c;开源的 LLM 应用开发平台。提供从 Agent 构建到 AI workflow 编排、RAG 检索、模型管理等能力&#xff0c;轻松构建和运营生成式 AI 原生应用&#xff0c;比 LangChain 更易用。一个平台&#xff0c;接入全球大型语言模型。不同…

Google地图获取位置的前端代码与测试

test.html <script src"http://maps.google.com/maps/api/js?sensorfalse"></script> <script > if (navigator.geolocation) {  console.log(Geolocation is supported!);// var startPos;var geoSuccess function(position) {startPos p…

MySQL4(事务、函数、慢查询和索引)

目录 一、MySQL事务 1. 概念 2. 事务的ACID原则 3. MySQL实现事务的方法 4. MySQL实现事务的步骤 5. 事务的原子性、一致性、持久性 6. 事务的隔离性 7. MySql中的锁 1. 共享锁 2. 排他锁 3. 行级锁 4. 表级锁 5. 间隙锁 6. 临键锁 7. 记录锁 8. 意向共享锁…