RabbitMq的基本理解

news2025/1/11 2:45:43

MQ概念及同步异步:

同步调用:

是一种编程模型,其中调用者发送请求并等待响应。在同步调用中,调用者会阻塞,直到被调用的方法返回结果。

异步调用:

是一种编程模型,其中调用者发送请求后立即返回,而不等待响应。在异步调用中,调用者不会阻塞,可以继续执行其他操作。

我的简单理解就是代码是按照顺序执行下来的就是同步调用,如果中间因为有一个函数卡住了,整个线程会阻塞只到等待这个阻塞完成,而相比之下,异步就会跳过去,先去执行下面的代码,一般说到异步都需要一个回调函数,等那个阻塞的进程结束了,就会执行这个回调函数。

消息队列:

从一个场景来引出这个消息队列:

这个是黑马商城中支付服务的流程图,支付服务完成之后,就会执行用户服务扣减余额,然后在同步进行到交易服务执行更新订单状态,后面应该还有一个这个商品服务扣减商品

整体的流程就是同步的。

这其中就存在一些问题:

第一拓展性差

我们目前的业务相对简单,但是随着业务规模扩大,产品的功能也在不断完善。

在大多数电商业务中,用户支付成功后都会以短信或者其它方式通知用户,告知支付成功。假如后期产品经理提出这样新的需求,你怎么办?是不是要在上述业务中再加入通知用户的业务?

第二性能下降

由于我们采用了同步调用,调用者需要等待服务提供者执行完返回结果后,才能继续向下执行,也就是说每次远程调用,调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和:

第三,级联失败

由于我们是基于OpenFeign调用交易服务、通知服务。当交易服务、通知服务出现故障时,整个事务都会回滚,交易失败。

这其实就是同步调用的级联失败问题。

为了更好的解决上述的功能,就引出了消息队列来实现异步调用。

异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

  • 消息发送者:投递消息的人,就是原来的调用方

  • 消息Broker:管理、暂存、转发消息,你可以把它理解成微信服务器

  • 消息接收者:接收和处理消息的人,就是原来的服务提供方

整体的流程就是,发送者执行了自己的业务之后,往这个消息队列里面发一个消息,然后消息队列就将这个消息发给接收者

说回我们刚刚那个项目背景,

当我们执行完用户服务之后,为什么要执行完用户服务呢

因为整体的业务也需要一个主任务,主任务就是支付,扣款,之后的服务都不是很重要的服务

所以,异步虽好,不过也不能什么都异步执行,

如果连上面这个扣款都异步执行,那用户都不知道自己是否已经付款成功了,也是影响用户体验。

RabbitMQ的基本安装和基本概念:

这个RabbitMQ还是使用docker部署:

第一步拉镜像:
docker pull rabbitmq:3.9-management

这里我拉取了3.9的,3.8的我一直运行不起来,我也是可服

第二步:在容器中运行:
docker run \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3.9-management

这里其中有一条命令:

 -v mq-plugins:/plugins \

mq-plugins 是宿主机上的目录,/plugins 是容器内的目录

这个就是将宿主机上的 mq-plugins 挂载到容器中/plugins的目录,docker数据卷挂载的知识

安装完成后,我们访问 http://192.168.150.101:15672即可看到管理控制台。

账号密码初始化都是guest

也可以用docker ps来查看

基本概念:

  • publisher:生产者,也就是发送消息的一方

  • consumer:消费者,也就是消费消息的一方

  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理

  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。不能存储消息

  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

出了这上面说的几种角色之外,还有一个对象就是交换机和队列的绑定关系,这个在java代码中也是一个对象(在java中万物皆可对象)

上面之前说的 一个发送方,一个消息代理,一个接收方

其实这个消息代理还包括了交换机和消息队列两个部分

在Java中的基本配置:

首当其冲的就是依赖:

        <!--amqp-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

还有一些配置文件:

spring:
  rabbitmq:
    host:  # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: hmall # 用户名
    password: 123 # 密码

 在消息的接收方和发送方都需要配置。

WorkQueues模型:

Work queues,任务模型。简单来说就是多个消费者绑定到一个队列,共同消费队列中的消息

这就是一个非常简单的模型

值得一提就是就是有一个配置

就是当一个consumer处理消息很慢的时候,我们可以在配置类中加入:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

就加入这样一个配置之后,处理快的consumer就可以获得更多的消息,类似于能者多劳,也是负载均衡。

交换机的类型:

Fanout广播交换机:

这个的就是所有和这个交换机有绑定关系的队列都能收到消息

 Direct交换机:

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey

  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

这种应该是我们在代码中用的比较多的,待会直接在代码中体现

Topic交换机:

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。

只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!

通配符规则:

  • #:匹配一个或多个词

  • *:匹配不多不少恰好1个词

就比如你指定了key为china.#

那我指定了china.aa.a.a,china.a都能匹配上

如果我指定了*.china

那我只能匹配a.china,a.a.china就不能匹配了

在Java中声明交换机和队列和消息转换器的配置 :

在Java中声明交换机和队列有两种方式:

第一种方式:
@Configuration
public class FanoutConfig {
    //声明交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("hmall.fanout");
    }

    //声明队列1
    @Bean
    public Queue queue1(){
        return new Queue("fanout.queue1");
    }

    //声明队列2
    @Bean
    public Queue queue2(){
        return new Queue("fanout.queue2");
    }

    //绑定队列和交换机
    @Bean
    public Binding binding1(FanoutExchange fanoutExchange,Queue queue1){
        final Binding binding = BindingBuilder.bind(queue1).to(fanoutExchange);
        return binding;
    }
    //绑定队列和交换机
    @Bean
    public Binding binding2(FanoutExchange fanoutExchange,Queue queue2){
        final Binding binding = BindingBuilder.bind(queue2).to(fanoutExchange);
        return binding;
    }
}

整体的逻辑就是声明交换机

再声明两个队列

然后分别指定这个交换机和这两个队列的绑定关系,上面说了在java中这个绑定关系也是一个对象

第二种方式:
@Configuration
public class DirectExchangeConfig {


    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
            key = {"red"}
    ))
    public void DirectQueuq1(String mes){
        System.out.println("消费者1接收到direct.queue1的消息:【" + mes + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
            key = {"blue"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
    }

}

第二种方式也是用最多的,就是用这个@RabbitListener这种注解的方式直接指定

并且在里面可以指定属性,比如最基本的name,还有type,就是fanout,direct(默认),topic。

消息转换器:

首先我们需要指定在消息队列中这个消息转换器在做什么

在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大

  • 有安全漏洞

  • 可读性差

所以我们需要配置JSON转换器:

引入一个依赖:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>
@Configuration
@ConditionalOnClass(value = {MessageConverter.class, RabbitTemplate.class})
public class MqConfig {


    @Bean
    @ConditionalOnBean(ObjectMapper.class)
    public MessageConverter messageConverter(ObjectMapper mapper){
        // 1.定义消息转换器
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(mapper);
        // 2.配置自动创建消息id,用于识别不同消息
        jackson2JsonMessageConverter.setCreateMessageIds(true);
        return jackson2JsonMessageConverter;
    }
}

在publisher和consumer中都配置一个这个消息转化器

这里还有一个注意点:

@ConditionalOnClass(value = {MessageConverter.class, RabbitTemplate.class})

这个是spring中的一个注解,作用就是如果这个项目中有相关的依赖我们这个消息转化器才生效

额外的思路:

就是关于一个登录信息传递优化:

某些业务中,需要根据登录用户信息处理业务,而基于MQ的异步调用并不会传递登录用户信息。前面我们的做法比较麻烦,至少要做两件事:

  • 消息发送者在消息体中传递登录用户

  • 消费者获取消息体中的登录用户,处理业务

这样做不仅麻烦,而且编程体验也不统一,毕竟我们之前都是使用UserContext来获取用户。

碰到这个问题的时候就可以把这个用户的id在消息转换器中设置:

信息发送间的可靠性:

消息从生产者到消费者的每一步都可能导致消息丢失:

  • 发送消息时丢失:

    • 生产者发送消息时连接MQ失败

    • 生产者发送消息到达MQ后未找到Exchange

    • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue

    • 消息到达MQ后,处理消息的进程发生异常

  • MQ导致消息丢失:

    • 消息到达MQ,保存到队列后,尚未消费就突然宕机

  • 消费者处理消息时:

    • 消息接收后尚未处理突然宕机

    • 消息接收后处理过程中抛出异常

所以我们要从三个方面入手:发送者,MQ自身的持久化,消费者处理信息

发送者的可靠性:

其实这个发送者的可靠性第一个知识点是这个生产者重试机制

就是在配置文件中指定重试的次数,但是后面可以在定义ConfirmCallback中进行手动指定,所以直接讲生产者确认机制

生产者确认机制:
  • ReturnCallback用于处理消息无法路由到队列的情况。

  • ConfirmCallback用于处理消息成功发布到交换机的情况。

    用更多的是ConfirmCallback,直接把最后那个RabbitMqHelper的代码放出来并写上自己的理解过程

贴一段代码:直接在代码中讲:


    //3:生成者确认机制对立
    public void sendConfirmMessage(String exchange,String routingKey,Object msg,int maxRetries){
        CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());//生成唯一标识符,关联消息和确认结果
        cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            int retrycount = 0;
            @Override
            public void onFailure(Throwable ex) {
                log.error("处理ack回执失败", ex);
            }

            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                if(result!=null&&!result.isAck()){
                    log.debug("消息发送失败,收到nack,已重试次数:{}", retrycount);
                    if(retrycount>=maxRetries){
                        log.error("消息发送重试次数耗尽,发送失败");
                        return;
                    }
                }
                CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
                cd.getFuture().addCallback(this);
                rabbitTemplate.convertAndSend(exchange,routingKey,msg);
                retrycount++;
            }
        });
        rabbitTemplate.convertAndSend(exchange,routingKey,msg,cd);
    }

从上到下分析:

CorrelationData 是 Spring AMQP(Advanced Message Queuing Protocol)模块中的一个类,用于在发送消息时关联消息和确认结果。

用途

在发送消息时,CorrelationData 可以用来存储一些与消息相关的数据,如消息的唯一标识符、发送时间等。当消息被确认(ack)或拒绝(nack)时,可以通过 CorrelationData 获取这些数据,以便进行相应的处理。

我们在这段代码中还用UUID给这个消息对象生成了一个唯一ID,避免冲突

// 为确认结果添加回调函数
cd.getFuture().addCallback(new ListenableFutureCallback<>()){
}

这个就是上面异步调用中提到的异步调用留的函数

ListenableFutureCallback 是 Spring Framework 中的一个接口,用于处理 ListenableFuture 的成功和失败结果。ListenableFuture 是一个可以异步获取结果的 Future 接口的扩展。

实现这个接口需要重写两个方法:成功和失败的方法。

再往下走就是写了一个计数的来统计这个调用的次数

我们来重点看一下这个成功onSuccess方法:

要看懂里面的判断逻辑需要补充一下交换机返回的值:

  1. ack(确认):当消息成功发送到队列时,RabbitMQ会向生产者发送一个ack确认。生产者收到ack确认后,就知道消息已经成功投递到队列。

  2. nack(拒绝):当消息无法发送到队列时,RabbitMQ会向生产者发送一个nack拒绝。生产者收到nack拒绝后,可以采取相应的措施,如重试发送消息、记录错误日志等。

我们知道ConfirmCallback就是来判断这个发送方发给交换机这一个过程。

所以交换机找到了可以交付的队列,就传递成功,返回ack,如果没有找到,就返回nack

所以,我们可以根据返回的值是否是ack来判断是否成功了。

有了上面的知识,这个判断逻辑就很简单了

如果不是ack,就和我们自己指定的这个最大尝试次数判断,如果没超过这个重试次数,我们就继续发送消息,发送消息的流程一样,就是生成唯一的消息标识,然后执行回调函数

到最后:

rabbitTemplate.convertAndSend(exchange,routingKey,msg,cd);

如果没有被返回,就说明可以发送,直接发送即可。

MQ的可靠性:

其实这个MQ的可靠性的没什么好说的

因为稍高版本一点的mq已经默认都用了惰性队列

自动持久化,我们只要不自己取消就行。

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存

  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)

  • 支持数百万条的消息存储

这里的磁盘具体是哪里呢?

 RabbitMQ将数据存储在磁盘上的位置取决于其配置。默认情况下,RabbitMQ会将数据存储在RabbitMQ的安装目录下的/var/lib/rabbitmq/mnesia目录中。这个目录包含了RabbitMQ的数据库文件,包括队列、交换机、绑定、用户、权限等元数据。

消费者的可靠性:

消费者确认机制:

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息

  • nack:消息处理失败,RabbitMQ需要再次投递消息

  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

一般reject方式用的较少,除非是消息格式有问题,或者是内存不足之类的问题。那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.

 我们如果要开启这个消费者确认机制需要修改一下配置即可:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 自动ack

开启了这个确认机制的作用就是:我这个消息发送到了消费者,消费者处理失败,返回nack,但是这个信息还是保持在消息队列中的。但是如果返回的是reject,那消息队列同样也会删除消息。

失败重试机制:

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。

这个过程听起来就很不行,如果消息一直出错,那样也会占用很多的资源

所以就有了这个失败重试机制。也是需要更改配置:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

失败处理策略:

在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。

因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

 代码如下:

@Configuration
public class ErrorMessageConfig {

    //定义失败的交换机
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("errorchange");
    }

    //定义失败的队列
    @Bean
    public Queue errorQueue(){
        return new Queue("errorQueue");
    }

    //定义队列和交换机的绑定关系
    @Bean
    public Binding errorBinding(Queue errorQueue,DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("errorRoutingKey");
    }

    //定义errorMessageExchange
    @Bean
    public MessageRecoverer recoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate,"errorchange","errorRoutingKey");
    }

}

整体的逻辑就是:

先定义一个队列和交换机,并且确立好两者的绑定关系

接着再定义一个RepublishMessageRecoverer绑定队列和交换机

业务幂等性:

 这个概念听起来还蛮难,说起来就是不管多少次操作,结果都是一样的就行

保证业务幂等性的两种方案:

唯一消息id,给每个消息都绑定上一个唯一的id

这个思路非常简单:

  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。

  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库

  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

我们该如何给消息添加唯一ID呢?

其实很简单,SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。

以Jackson的消息转换器为例:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}

业务判断:

业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。

例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。

    @Override
    public void markOrderPaySuccess(Long orderId) {
        // 1.查询订单
        Order old = getById(orderId);
        // 2.判断订单状态
        if (old == null || old.getStatus() != 1) {
            // 订单不存在或者订单状态不是1,放弃处理
            return;
        }
        // 3.尝试更新订单
        Order order = new Order();
        order.setId(orderId);
        order.setStatus(2);
        order.setPayTime(LocalDateTime.now());
        updateById(order);
    }

这里还有一个兜底方案的思路:

就是如果mq实在消息通知失败了。

那我们应该怎么办呢

在讲怎么办之前,我们需要先回顾一下业务流程

用户下完单之后,在支付服务生成订单,然后向用户服务发送请求(远程调用)尝试扣减余额

扣减成功之后,往消息队列中发送一条消息,并且给用户返回支付成功的提示

然后交易服务监听消息队列中的消息进行扣款

结合上面我们说的业务,我们说的问题就是如果用户下单之后,消息队列的工作没有做好,有可能是这个消息没发到消息队列里面去,也有可能是交易服务没收到。

那这个时候我们的兜底方案就是可以在交易服务中查询支付服务的订单(远程调用),看看这个订单是否已经被支付了。

这样一来,这个系统就很稳了。

延迟消息:

在电商的支付业务中,对于一些库存有限的商品,为了更好的用户体验,通常都会在用户下单时立刻扣减商品库存。例如电影院购票、高铁购票,下单后就会锁定座位资源,其他人无法重复购买。

但是这样就存在一个问题,假如用户下单后一直不付款,就会一直占有库存资源,导致其他客户无法正常交易,最终导致商户利益受损!

因此,电商中通常的做法就是:对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存

在RabbitMq中一般这种在一段时间之后才执行的任务:称为延迟任务

在MQ中实现延迟任务的方式:

  • 死信交换机+TTL

  • 延迟消息插件

用的最多是第二种方式:

延迟消息插件

所以接下来就简单介绍一下第一种方式:

死信交换机+TTL

4.1.1.死信交换机

什么是死信?

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false

  • 消息是一个过期消息,超时无人消费

  • 要投递的队列消息满了,无法投递

如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

我们根据上面这个私信交换机的特点:

我们来设想一个方案:

我们往先指定一个正常的交换机和队列,接着再指定一个死信交换机,将这个队列和死信交换机连连接起来,我们往这个队列中发送一条消息,并且给这个消息设置过期时间,但是我们不指定消费者,那我们可想而知,最后这个消息等超过了这个过期时间,就是进入到这个死信交换机中,那不就刚好达成我们延迟消息的目的了嘛。

延迟消息插件

这个插件是rabbitmq社区中的一个插件

下载地址:GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ

下载的时候版本需要对应

 安装:

因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。

docker volume inspect mq-plugins
[
    {
        "CreatedAt": "2024-06-19T09:22:59+08:00",
        "Driver": "local",
        "Labels": null,
        "Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
        "Name": "mq-plugins",
        "Options": null,
        "Scope": "local"
    }
]

 插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。

接下来执行命令,安装插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

运行结果如下:

代码编写:

用@RabbitListener方式创建

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}

基于@Bean的方式:

package com.itheima.consumer.config;

@Slf4j
@Configuration
public class DelayExchangeConfig {

    @Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder
                .directExchange("delay.direct") // 指定交换机类型和名称
                .delayed() // 设置delay的属性为true
                .durable(true) // 持久化
                .build();
    }

    @Bean
    public Queue delayedQueue(){
        return new Queue("delay.queue");
    }
    
    @Bean
    public Binding delayQueueBinding(){
        return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
    }
}

发送消息时需要指定延迟时间:

@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}
注意:

延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。

因此,不建议设置延迟时间过长的延迟消息

在项目中的具体实战:

我们从项目的流程中引出消息队列

然后我们现在又要回到项目中,我们整理一下创建订单的流程是:

1:处理订单数据,包括对商品数据进行校验,获取商品id和商品数量的map,计算总价,再设置一下其它的属性(支付类型,用户id),最后将支付订单存储到order表中

2:将订单的明细存储到订单明细表中

3:清理购物车

4:扣减库存

5:现在需要发送延迟消息到消息队列,接收方收到消息之后校验订单状态然后再修改订单状态保存到数据库

具体代码:

首先可以先定义几个常量,规定一下这个交换机,队列名称,key的值:

public class MqConstant {
    public static final String DELAY_EXCHANGE_NAME = "trade.delay.direct";
    public static final String DELAY_ORDER_QUEUE_NAME = "trade.delay.order.queue";
    public static final String DELAY_ORDER_KEY = "delay.order.query";
}
private final IOrderDetailService detailService;
    private final ItemClient itemClient;
    private final CartClient cartClient;
    private final RabbitTemplate rabbitTemplate;

@Override
    @GlobalTransactional
    public Long createOrder(OrderFormDTO orderFormDTO) {
        // 1.订单数据
        // 2.保存订单详情
        // 3.清理购物车商品
        // 4.扣减库存
        //5.发送延迟消息
        rabbitTemplate.convertAndSend(
                MqConstant.DELAY_EXCHANGE_NAME,
                MqConstant.DELAY_ORDER_KEY,
                order.getId(), new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setDelay(10000);
                        return message;
                    }
                }
        );
        return order.getId();
    }

   接着在交易服务中创建一个监听器:

@RequiredArgsConstructor
public class OrderDelayMessageListener {

    private final IOrderService iOrderService;
    private final PayClient payClient;

    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(name = MqConstant.DELAY_EXCHANGE_NAME,delayed = "true"),
            value = @Queue(name = MqConstant.DELAY_ORDER_QUEUE_NAME),
            key = MqConstant.DELAY_ORDER_KEY
    ))
    public void listenOrderDelayMessage(Long orderId){
        //1:查询订单
        Order order = iOrderService.getById(orderId);
        //2:检测订单状态,判断是否已支付
        if(order==null||order.getStatus()!=1){
            return;
        }
        //3:未支付,需要查询支付流水状态(就是查pay_order表是否更新了)
        PayOrderDTO payOrderDTO = payClient.queryPayOrderByBizOrderNo(orderId);
        //4:判断是否支付
        if(payOrderDTO==null){
            return;
        }
        Integer status = payOrderDTO.getStatus();
        //4.1:如果已经支付,标记订单为已支付
        if(status==3){
            iOrderService.markOrderPaySuccess(orderId);
        }else {
            //4.2:未支付,取消订单,回复库存
            iOrderService.cancelOrder(orderId);
        }

    }
}

整体代码逻辑:

首先用一个注解来创建这个消息队列

接着去查询订单状态,判断是否已经支付,

如果去查询了订单的状态发现没有支付,但是也不一定是未支付,我们还需要去查支付服务中的订单,如果这个订单没支付那才是真正的没支付

最后根据是否支付来决定即可

目录

MQ概念及同步异步:

同步调用:

异步调用:

消息队列:

RabbitMQ的基本安装和基本概念:

这个RabbitMQ还是使用docker部署:

第一步拉镜像:

第二步:在容器中运行:

基本概念:

在Java中的基本配置:

WorkQueues模型:

交换机的类型:

Fanout广播交换机:

 Direct交换机:

Topic交换机:

在Java中声明交换机和队列和消息转换器的配置 :

第一种方式:

第二种方式:

消息转换器:

额外的思路:

信息发送间的可靠性:

发送者的可靠性:

生产者确认机制:

用途

MQ的可靠性:

消费者的可靠性:

消费者确认机制:

失败重试机制:

失败处理策略:

业务幂等性:

唯一消息id,给每个消息都绑定上一个唯一的id

业务判断:

延迟消息:

死信交换机+TTL

4.1.1.死信交换机

延迟消息插件

代码编写:

注意:

在项目中的具体实战:


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2050667.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

09结构型设计模式——组合模式

一、组合模式的简介 组合模式&#xff08;Composite Pattern&#xff09;是一种结构型设计模式&#xff0c;主要用于处理树形结构中的对象组合问题。它允许你将对象组合成树形结构&#xff0c;以表示部分-整体层次结构。组合模式使得客户端能够统一地对待单个对象和对象组合&a…

SEREN MC2电源匹配器控制器Matching Network Controller手侧

SEREN MC2电源匹配器控制器Matching Network Controller手侧

NC 反转链表

系列文章目录 文章目录 系列文章目录前言 前言 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站&#xff0c;这篇文章男女通用&#xff0c;看懂了就去分享给你的码吧。 描述 给定一个单链…

亲测解决Bundler HTTPError Could not fetch specs from

这个问题源于ruby的网站连接不上&#xff0c;解决方法是修改网页地址或者网络配置。 环境 win11 ruby 问题原文 Retrying fetcher due to error (2/4): Bundler::HTTPError Could not fetch specs from https://rubygems.org/ due to underlying error <IO::TimeoutEr…

C# 将Dll嵌入exe中发布

一、制作模版Dll 二、在exe工程中添加Dll 1、添加上述“创建Dll”&#xff0c;并修改属性为&#xff1a;不复制到输出目录的嵌入资源 2、引用“Resource”中的dll文件&#xff0c;并修改属性&#xff1a;不复制到本地 三、添加重载Dll代码 1、添加以下代码 class DependentFi…

Modbus 通信协议详解

目录 一、概述二、Modbus 的作用三、Modbus 的工作原理1、四种数据类型2、三种工作模式3、三类功能码3.1 标志功能码3.2 Modbus 封装接口3.3 异常 4、Modbus 协议层4.1 协议数据单元4.2 访问数据4.3 数据模型寻址4.3.1 数据寻址范围4.3.2 数据地址起始值 4.4 大数据类型4.4.1 位…

频率检测计

前言 频率计是一种用于测量信号频率的仪器。它可以准确地确定电子信号的频率&#xff0c;广泛应用于电子设备的测试和维护中。频率计的工作原理通常包括对输入信号进行采样&#xff0c;并通过内部电路计算信号的周期&#xff0c;从而得到频率值。现代频率计通常具有高精度、高稳…

无线通信代码搬运/复现系列(1) : 重新审视具有每天线功率约束的 MIMO 容量:固定点迭代和交替优化

无线通信代码搬运/复现系列(1) “Revisiting the MIMO Capacity with Per-antenna Power Constraint: Fixed-point Iteration and Alternating Optimization,” IEEE Trans. Wireless Commun., vol. 18, no. 1, pp. 388-401, Jan. 2019 by T. M. Pham, R. Farrell, and L.-N. …

C++入门——05STL

STL&#xff08;Standard Template Library&#xff0c;标准模板库&#xff09;是C标准库的重要组成部分&#xff0c;是一个通用的数据结构和算法库。STL提供了一组经过精心设计的模板类和函数&#xff0c;用于处理各种常见的数据结构&#xff08;如容器&#xff09;和算法&…

六. 部署分类器-preprocess-speed-compare

目录 前言0. 简述1. 案例运行2. 代码分析2.1 main.cpp2.2 preprocess.cpp 3. 补充说明结语下载链接参考 前言 自动驾驶之心推出的 《CUDA与TensorRT部署实战课程》&#xff0c;链接。记录下个人学习笔记&#xff0c;仅供自己参考 本次课程我们来学习课程第六章—部署分类器&…

嵌入式面经篇八——进程线程

文章目录 前言一、进程&线程1、异步 IO 和同步 IO 区别&#xff1f;2、进程间通信方式&#xff1f;3、进程的地址空间模型&#xff1f;4、进程的五种状态分别是?5、子进程从父进程继承的资源有哪些&#xff1f;6、什么是进程上下文、中断上下文&#xff1f;7、如何防止僵尸…

写了一个分页 sql,因为粗心出了 bug 造成了 OOM!

大家好&#xff0c;我是君哥。 最近上完线后&#xff0c;凌晨收到一个生产告警&#xff0c;一个 OOM 异常导致了服务重启。今天来分享一下这个事故。 1.事故现场 事故的代码逻辑并不复杂&#xff0c;从一个大概有 8 万数据的表里面查出数据&#xff0c;汇总后对数据做处理。…

高校宣讲会管理系统--论文pf

TOC springboot370高校宣讲会管理系统--论文pf 第1章 绪论 1.1选题动因 当前的网络技术&#xff0c;软件技术等都具备成熟的理论基础&#xff0c;市场上也出现各种技术开发的软件&#xff0c;这些软件都被用于各个领域&#xff0c;包括生活和工作的领域。随着电脑和笔记本的…

【myz_tools】Python库 myz_tools:Python算法及文档自动化生成工具 - 0.2.0版更新

文章目录 0.2.0 更新内容如下函数generate_2d_combinations_iter函数generate_row_permutations函数calculate_total_permutations函数display_combinations函数evaluate_list_similarity函数check_unique 写在前面关于库库使用库内所有函数目录文件名称: common_maths.py函数部…

ZooKeeper分布式协调系统介绍

1. ZooKeeper概述 1.1 ZooKeeper介绍 ZooKeeper 是 Apache 软件基金会的一个项目&#xff0c;它确实提供了一种非常有用的服务&#xff0c;用于维护分布式系统中的配置信息、命名、提供分布式同步和提供组服务等。它的核心是原子广播和大约一致性模型&#xff0c;这使得它能够…

CCF-GESP五级考级——初等数论,全网最精简的求最大公约数gcd和最小公倍数lcm方法(100%好使)

&#x1f451;一、约数和因数的区别 约数必须在整除的前提下才存在&#xff0c;而因数是从乘积的角度来提出的。如果数与数相乘的积是数&#xff0c;是的因数。 1.约数只能对在整数范围内而言&#xff0c;而因数就不限于整数的范围。 举个栗子&#xff1a;。2和8是16的…

中仕公考:国考往年招录情况对比

2025年国考预计10月中旬启动&#xff0c;11月进行笔试。中仕为大家总结了往年的国考招录情况&#xff0c;希望能给大家一些参考。 2024年计划招录3.96万人。截止到考试结束&#xff0c;共有225.2万人参加了考试&#xff0c;参加考试人数与录用计划数之比约为57:1&#xff0c;2…

CSP-J 2023真题一轮

选择题 阅读题 第1题 第2题 第3题 完善程序 第1题 第2题 答案&#xff1a; 一、单选题 1-5 BDAAC 6-10 BCADA 11-15 ABBAD 二、阅读程序 1&#xff09; 16. √ 17. √ 18. ⅹ 19.A 20.B 2&#xff09; 21. √ 22. ⅹ 23. √ 24. D 25.B 26.D 3&#xff09; 27. √ 28. √ 29…

EasyCVR视频汇聚平台构建远程安防监控:5大亮点解析,助力安防无死角

随着科技的飞速发展&#xff0c;远程安防监控系统已经成为现代社会中不可或缺的一部分&#xff0c;无论是在小区、公共场所还是工业领域&#xff0c;安防监控都发挥着至关重要的作用。而EasyCVR作为一款功能强大的视频监控综合管理平台&#xff0c;其在构建远程安防监控系统方面…

谷歌浏览器下载文件被阻止怎么解除

在工作生活中&#xff0c;我们会使用谷歌浏览器下载各种各样的文件&#xff0c;不过偶尔会遇到文件下载被阻止的情况。为了解决这一问题&#xff0c;本文为大家分享了实用的措施建议&#xff0c;一起来了解一下吧。&#xff08;本文由https://chrome.cmrrs.com/站点的作者进行编…