【Java开发】Spring Cloud 10 :Stream消息驱动

news2024/10/6 14:30:44
官方定义Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它为一些供应商的消息中间件产品提供了个性化的自动化配置实现,Spirng Cloud Stream 本质上就是整合了 Spring Boot 和 Spring Integration,实现一套轻量级的消息驱动的微服务框架。通过使用 Spring Cloud Stream 可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。

项目源码:尹煜 / coupon-yinyu · GitCode

1 消息驱动介绍

1.1 服务间解耦

如果你认为把服务拆分成微服务就叫做服务间解耦,那咱对微服务的认知还停留在第一层。在很多场景中,我们还需要借助消息驱动组件对业务场景做进一步解耦。

举个例子,在我们网购下单完成付款之后,有一系列的后续业务流程会被执行。比如买家短信和邮件通知、IM 和站内信推送、金币和积分结算、卖家端履约流程等等。有时候搞线上活动,还会在付款完成之后触发赠券服务。

假设这些业务场景都被拆分成了微服务,从业务完整性的角度来讲,为了实现付款后自动触发完整链路,交易服务的回调接口必须挨个调用前面提到的各个业务系统。如果未来需要接入新的业务场景,你还得往回调接口里加上一个新的系统集成点。再从从服务容错的角度考虑,你还得兼顾关键场景(如金币 + 积分结算)的失败重试,这些逻辑都掺和到了支付成功的回调接口里。

所以,即便我们的业务系统是微服务架构,上下游之间的调用还是跑不掉,这种代码中的调用关系也是一种“耦合”。在真实业务中这类场景很普遍,它的特点是通过某个事件(如支付成功)触发多个下游业务场景,像这类场景就特别适合使用消息驱动技术做解耦

比如,我可以将付款成功的信息连同当前订单信息放入一个消息队列中,让所有的下游服务监听这个队列,通过这种“断直连”的方式,我们就将上下游服务之间的耦合间接地解除了。不管以后下游服务要添加什么新场景,对上游服务都几乎是无感知的,因为新的业务场景只要对接消息队列就好,并不需要对上游服务发起调用。

1.2 消息广播

消息广播是相对于单播来讲的。单播是指在同一个消费组里,最多只有一个消费者实例可以去消费消息,而广播则是说,一个消费组里所有的消费者都会对消息做一次消费。

消息广播的一个常用场景是热点数据的处理,热点数据(高流量)是高可用破防能手。各个大厂都有自己的热点侦测方案,这个不展开说了,就说一旦某个资源被甄别成热点数据之后,是不是要通知各个服务“小心防范”?碰到热点资源的访问请求,直接打到专门的热点集群上做处理。

那么这里的“通知”动作,就特别适合使用消息广播的方式来处理,我们只要在侦测到热点数据之后,发送一个消息到特定的消息队列,让各个有可能接收到热点请求的应用服务接入这个队列,执行相应的热点逻辑。

还有一个和热点数据相类似的场景:本地缓存构建。你一定知道通过 Redis 和 Tair 这类缓存系统来抗 QPS,但对于一些访问频次比较高的资源,我们会倾向于在 Client 本地构建一个“本地缓存”,一来堆内缓存一定是访问速度最快的缓存(绝对比外部缓存快),二来可以降低外部缓存的 QPS,毕竟缓存也是能被压崩盘的。

和热点数据同理,这个例子中的“本地缓存”也是可以通过消息广播来构建的。比如在网关或者 RPC 链路上,通过一些流技术对实时调用情况进行聚合分析,将访问频次比较高的资源标记为临时热点,并通过消息驱动推送到各个消费者节点。这样,我们就借助消息广播场景实现了资源标记的推送。

1.3 延迟业务

你可以把这类业务理解为一个闹钟,它是在未来某个时间会被执行的业务逻辑。最常见的一类延迟业务就集中在网购中的订单模块,这里举两个例子。

  • 订单确认:下了单付了款收了货,就是不点确认收货,没关系,7 天之后系统会自动确认。

  • 取消订单:下单之后在 30 分钟时间内没有付款,自动取消订单。

上面这两个场景都可以借助延迟消息来实现,不过在具体实现的时候,你还需要借助消息分区等功能降低消息的积压量。

1.4 削峰填谷

削峰就是指削减峰值流量,如果某个业务的峰值流量超过了系统吞吐量,并且这类业务又非常重要,不能简单粗暴地通过限流熔断把请求 cut 掉,那么你可以考虑把这些请求压入消息队列,让消费者根据自身的吞吐量从队列中获取消息并消费。

填谷就是指闲的没事儿干的时候让你忙起来,当业务峰值已经过去了,流量逐渐减少的时候,先前积压在消息队列中的请求就能被逐渐消化。

削峰填谷其实是一种平滑利用资源的手段,之所以我们能将大量消息压入消息队列,是因为目前主流的消息队列都有非常强大的消息堆积能力。当然了,MQ 组件的消息积压量也是有极限的,在真实的线上业务中,我们会为消息队列构建完善的监控指标,提前对消息积压进行预警。削峰填谷这个用法适合用在一些实时性要求不高,但并发量比较高的业务中。

2 Spring Cloud Stream 集成 RabbitMQ

本项使用Spring Cloud Stream 技术来一场演练,基于 RabbitMQ 消息中间件来落地实践场景。

以往我们在项目中使用 Stream 时,大都是使用经典的 @Input、@Output 和 @StreamListener 等注解来注册消息生产者和消费者,而 Stream 在 3.1 版本之后在这几个注解上打了一个 @Deprecated 标记,意思是这种对接方式已经被淘汰了,不推荐继续使用。取而代之的是更为流行的 Functional Programming 风格,也就是我们俗称的函数式编程。

因为函数式消息驱动在同一个应用包含多个 Event Topic 的情况下有一些特殊配置,所以为了方便演示这个场景,选择了 Customer 服务中的两个具有关联性的业务,分别是用户领取优惠券和删除优惠券,我们就将这两个服务改造成基于消息驱动的实现方式。

2.1 实现消息驱动

业务场景里的消息生产者和消费者都定义在了 Customer 服务中,可能你会以为,在真实项目里,生产者和消费者应该分别定义在不同的应用中,大多数情况下确实如此。比如在上节课的消息广播场景里,一个订单完成之后,通过广播消息触发下游各个服务的业务流程,这里的生产者和消费者是分在不同应用中的。

但是呢,我们也有把生产者和消费者定义在同一个应用中的场景,我叫它自产自销。比如在一些削峰填谷的例子中,为了平滑处理用户流量并降低负载,我们可以将高 QPS 但时效性要求不高的请求堆积到消息组件里,让当前应用的消费者慢慢去处理。比如我曾经实现的批量发布商品就是这么个自产自销的例子,商品服务接收请求后丢到 MQ,让同一个应用内部的消费者慢慢消化。

我们接下来就分三步走,用这个自产自销的路子来实现消息驱动业务。先添加生产者代码,再定义消费者逻辑,最后添加配置文件。

按照惯例,集成之前你需要先把下面这个 Stream 依赖项添加到 coupon-customer-impl 项目的 pom 文件中。由于我们底层使用的中间件是 RabbitMQ,所以我们引入的是 stream-rabbit 组件,如果你使用的是不同的中间件,那么需要引入对口的依赖项。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

① 添加生产者

生产者只做一件事,就是生产一个消息事件,并将这个事件发送到 RabbitMQ。Customer 服务下创建了一个叫做 CouponProducer 的类,添加了 sendCoupon 和 deleteCoupon 这两个生产者方法,分别对应了领取优惠券和删除优惠券。在这两个方法内,使用了 StreamBridge 这个 Stream 的原生组件,将信息发送给 RabbitMQ。

@Service
@Slf4j
public class CouponProducer {

    @Autowired
    private StreamBridge streamBridge;

    public void sendCoupon(RequestCoupon coupon) {
        log.info("sent: {}", coupon);
        streamBridge.send(EventConstant.ADD_COUPON_EVENT, coupon);
    }

    public void deleteCoupon(Long userId, Long couponId) {
        log.info("sent delete coupon event: userId={}, couponId={}", userId, couponId);
        streamBridge.send(EventConstant.DELETE_COUPON_EVENT, userId + "," + couponId);
    }

}

在这段代码里,streamBridge.send 方法的第一个参数是 Binding Name,它指定了这条消息要被发到哪一个信道中,其中 ADD_COUPON_EVENT=addCoupon-out-0,而 deleteCoupon=deleteCoupon-out-0。你先不要管这两个奇怪的值是什么,你只要把 Binding Name 理解成一条消息从 Stream 到达 RabbitMQ 之间的“通道”,待会儿看到配置文件的时候,你就会清楚这条通道是怎么与 RabbitMQ 中定义的消息队列名称关联起来的了。

消息的生产者已经定义好了,接下来在 CouponCustomerController 中新添加了两个方法,单独用来测试我们定义的两个生产者服务。这两个 Controller 方法接收的参数和现有的领券、删除券的接口是一致的,唯二的区别是请求路径后面多了个 Event,以及方法的返回值变成了 void。

@PostMapping("requestCouponEvent")
public void requestCouponEvent(@Valid @RequestBody RequestCoupon request) {
    couponProducer.sendCoupon(request);
}

// 用户删除优惠券
@DeleteMapping("deleteCouponEvent")
public void deleteCouponEvent(@RequestParam("userId") Long userId,
                         @RequestParam("couponId") Long couponId) {
    couponProducer.deleteCoupon(userId, couponId);
}

到这里,我们生产者端的配置就完成了,接下来我们就去编写消息的消费者。

② 添加消息消费者

CouponProducer 的同级目录下创建了一个 CouponConsumer 类,它作为消息的消费者,从 RabbitMQ 处消费由生产者发布的消息事件,方法底层仍然是调用 CustomerService 服务来完成业务逻辑。

在这段代码中,有一个“约定大于配置”的规矩你一定要遵守,那就是不要乱起方法名。这里定义的 addCoupon、deleteCoupon 两个方法名是有来头的,你要确保消费者方法的名称和配置文件中所定义的 Function Name 以及 Binding Name 保持一致,这是 function event 的一条潜规则。因为在默认情况下,框架会使用消费者方法的 method name 作为当前消费者的标识,如果消费者标识和配置文件中的名称不一致,那么 Spring 应用就不知道该把当前的消费者绑定到哪一个 Stream 信道上去。

@Slf4j
@Service
public class CouponConsumer {

    @Autowired
    private CouponCustomerService customerService;

    @Bean
    public Consumer<RequestCoupon> addCoupon() {
        return request -> {
            log.info("received: {}", request);
            customerService.requestCoupon(request);
        };
    }

    @Bean
    public Consumer<String> deleteCoupon() {
        return request -> {
            log.info("received: {}", request);
            List<Long> params = Arrays.stream(request.split(","))
                    .map(Long::valueOf)
                    .collect(Collectors.toList());
            customerService.deleteCoupon(params.get(0), params.get(1));
        };
    }

}

到这里消费者的定义也完成了。在定义生产者和消费者的过程中多次提到了配置文件,下面我们就来看一下 Stream 的配置项都有哪些内容。

③ 添加配置文件

Stream 的配置项比较多,分为 Binder 和 Binding 两部分。我们先来看 Binder 部分,Binder 中配置了对接外部消息中间件所需要的连接信息。如果你的程序中只使用了单一的中间件,比如只接入了 RabbitMQ,那么你可以直接在 spring.rabbitmq 节点下配置连接串,不需要特别指定 binders 配置。

如果你在 Stream 中需要同时对接多个不同类型,或多个同类型但地址端口各不相同的消息中间件,那么你可以把这些中间件的信息配置在 spring.cloud.stream.binders 节点下。其中 type 属性指定了当前消息中间件的类型,而 environment 则指定了连接信息。

spring:
  cloud:
    stream:
      # 如果你项目里只对接一个中间件,那么不用定义binders
      # 当系统要定义多个不同消息中间件的时候,使用binders定义
      binders:
        my-rabbit:
          type: rabbit # 消息中间件类型
          environment: # 连接信息
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      

配置完了 binders,我们接下来看看如何定义 spring.cloud.stream.bindings 节点,这个节点保存了生产者、消费者、binder 和 RabbitMQ 四方的关联关系。

spring:
  cloud:
    stream:
      bindings:
        # 添加coupon - Producer
        addCoupon-out-0:
          destination: request-coupon-topic
          content-type: application/json
          binder: my-rabbit
        # 添加coupon - Consumer
        addCoupon-in-0:
          destination: request-coupon-topic
          content-type: application/json
          # 消费组,同一个组内只能被消费一次
          group: add-coupon-group
          binder: my-rabbit
        # 删除coupon - Producer
        deleteCoupon-out-0:
          destination: delete-coupon-topic
          content-type: text/plain
          binder: my-rabbit
        # 删除coupon - Consumer
        deleteCoupon-in-0:
          destination: delete-coupon-topic
          content-type: text/plain
          group: delete-coupon-group
          binder: my-rabbit
      function:
        definition: addCoupon;deleteCoupon

我们以 addCoupon 为例,你会看到定义了 addCoupon-out-0 和 addCoupon-in-0 这两个节点,节点名称中的 out 代表当前配置的是一个生产者,而 in 则代表这是一个消费者,这便是 spring-function 中约定的命名关系:

Input 信道(消费者):< functionName > - in - < index >;

Output 信道(生产者):< functionName > - out - < index >。

你可能注意到了,在命名规则的最后还有一个 index,它是 input 和 output 的序列,如果同一个 function name 只有一个 output 和一个 input,那么这个 index 永远都是 0。而如果你需要为一个 function 添加多个 input 和 output,就需要使用 index 变量来区分每个生产者消费者了。

现在你已经了解了生产者和消费者的信道是如何定义的,但是,至于这个信道和 RabbitMQ 里定义的消息队列之间的关系,你知道是怎么指定的吗?

信道和 RabbitMQ 的绑定关系是通过 binder 属性指定的。如果当前配置文件的上下文中只有一个消息中间件(比如使用默认的 MQ),你并不需要声明 binder 属性。但如果你配置了多个 binder,那就需要为每个信道声明对应的 binder 是谁。addCoupon-out-0 对应的 binder 名称是 my-rabbit,这个 binder 就是刚才在 spring.cloud.stream.binders 里声明的配置。通过这种方式,生产者消费者信道到消息中间件(binder)的联系就建立起来了。

信道和消息队列的关系是通过 destination 属性指定的。以 addCoupon 为例, addCoupon-out-0 生产者配置项中指定了 destination=request-coupon-topic,意思是将消息发送到名为 request-coupon-topic 的 Topic 中。我又在 addCoupon-in-0 消费者里添加了同样的配置,意思是让当前消费者从 request-coupon-topic 消费新的消息。

RabbitMQ 消息组件内部是通过交换机(Exchange)和队列(Queue)来做消息投递的,如果你登录 RabbitMQ 的控制台,就可以在 Exchanges 下看到声明的 delete-coupon-topic 和 request-coupon-topic。

切换到 Queues 面板,你还会看到这两个交换机所绑定的队列名称。这里的队列名称后面还跟了一个 group name,这就是我在消费者这一侧设置的消息分组,配置项中为 add-coupon-in 设置了 group=add-coupon-group,即当前分组内只有一台机器可以去消费队列中的消息,这就是所谓的“消息分组单播”的场景。如果你不设置 group 属性,那么这个消息就会成为一条“广播消息”。

有一个最为重要的配置项,那就是 spring.cloud.stream.function。如果你的项目中只有一组消费者,那么你完全不用搭理这个配置项,只要确保消费者代码中的 method name 和 bindings 下声明的消费者信道名称相对应就好了;如果你的项目中有多组消费者(比如声明了 addCoupon 和 deleteCoupon 两个消费者),在这种情况下,你需要将消费者所对应的 function name 添加到 spring.cloud.stream.function,否则消费者无法被绑定到正确的信道。

spring:
  cloud:
    stream:
      function:
        definition: addCoupon;deleteCoupon

到这里,我们就完整搭建了一套消息驱动的方案。

2.2 高效处理 Stream 中的异常

如果在 Consumer 消费消息的时候发生了异常,比如用户领取的优惠券超过了券模板约定的上限,或者用户想要删除一张压根不存在的券,那么 Consumer 会抛出一个运行期异常。

你可以调用 deleteCoupon 接口删除一张不存在的优惠券,人为制造一个异常场景,你会观察到,在 Consumer 端的日志中,当前消费者逻辑被执行了三次。这三次执行包括首次消息消费和两次重试,这就是 Stream 默认的一种异常处理方式:消息重试。

① 消息重试

消息重试是一种简单高效的异常恢复手段,当 Consumer 端抛出异常的时候,Stream 会自动执行 2 次重试。重试次数是由 ConsumerProperties 类中的 maxAttempts 参数指定的,它设置了一个消息最多可以被 Consumer 执行几次。

private int maxAttempts = 3;

但需要注意,这个 maxAttempts 并不是重试次数,它其实等于重试次数 +1,加的这个 1 指的就是 Consumer 头一次消费消息的计数。也就是说,如果你人为地设置 maxAttempts=1,那么就代表着当前 Consumer 只会消费一次消息,不会做重试;如果你设置 maxAttempts=2 则表示最多重试一次。那么如何来指定重试次数和重试规则呢?

在 application.yml 文件中,你可以在 spring.cloud.stream.bindings 节点下添加一个 consumer 节点,以 addCoupon-in-0 为例,我通过 consumer 节点指定了消息消费次数、重试间隔还有异常重试规则。

spring:
  cloud:
    stream:
      bindings:
        addCoupon-in-0:
          destination: request-coupon-topic
          content-type: application/json
          # 消费组,同一个组内只能被消费一次
          group: add-coupon-group
          binder: my-rabbit
          consumer:
            # 如果最大尝试次数为1,即不重试
            # 默认是做3次尝试
            max-attempts: 5
            # 两次重试之间的初始间隔
            backOffInitialInterval: 2000
            # 重试最大间隔
            backOffMaxInterval: 10000
            # 每次重试后,间隔时间乘以的系数
            backOffMultiplier: 2
            # 如果某个异常你不想重试,写在这里
            retryableExceptions:
              java.lang.IllegalArgumentException: false

在上面这段代码中,指定了 max-attempts 次数为 5,即一条消息最多被当前 Consumer 重试 4 次。这里还通过三个 backOff 参数指定了每次重试之间的间隔时间,这三个参数的时间单位都是毫秒。其中 backOffInitialInterval 是首次重试时的时间间隔,backOffMaxInterval 指定了两次重试之间最大的时间间隔,而 backOffMultiplier 则指定了重试间隔的相乘系数。

以代码中的参数为例,首次重试会发生在异常抛出 2s 以后,再过 4s 发生第二次重试(即 2s 乘以 backOffMultiplier 时间系数 2),以此类推,再过 8s 发生第三次重试。但第四次重试和第三次之间的间隔并不是 8s*2=16s,因为我们设置了重试的最大间隔时间为 10s,所以最后一次重试会在上一次重试后的第 10s 发起。

除此之外,如果你想为某种特定类型的异常关闭重试功能,你还可以将这些异常类添加到 retryableExceptions 节点下,并指定它的重试开关为 false。比如这里设置了针对 java.lang.IllegalArgumentException 类型的异常一律不发起重试,Consumer 消费失败时这个异常会被直接抛到最外层。

本地重试是一种简单高效的容错手段,但你需要注意确保幂等性,如果 Consumer 端的业务逻辑不具备幂等性,那么千万不要发起任何重试操作。在多次重试之间,你要尽可能使用 backOff 参数设置一定的间隔,这样做的目的是规避一些短周期的服务故障。比如网络连接在几秒钟之内发生了故障,导致 Consumer 无法调用目标服务,如果你的重试间隔是 0s,那么短时间内连续重试,极大概率会获得多个一样的 Connection 异常,而如果每次重试之间有一个梯度递增的间隔时间,往往就可以规避短期服务故障导致的重试失败问题。

除了本地重试以外,你还可以把这个失败的消息丢回到原始队列中,做一个 requeue 的操作。在 requeue 模式下,这个消息会以类似“roundrobin”的方式被集群中的各个 Consumer 消费,你可以参考下面的配置,指定 Consumer 添加了 requeue 的功能。如果你打算使用 requeue 作为重试条件,那么就不要留恋“本地重试”了,把 max-attempts 设置为 1 吧。

spring:
  cloud:
    stream:
      rabbit:
        bindings:
          # requeue重试
          addCoupon-in-0:
            consumer:
              requeue-rejected: true

② 异常降级方法

不止服务调用可以指定降级方法,消费消息也可以指定这样一段降级逻辑。如果你的服务重试了几次仍然没有成功,那么你就可以借助 spring-integration 组件的能力,为 Consumer 指定一段异常处理方法。

以用户领券的服务为例,在CouponConsumer里边通过 spring-integration 的注解 @ServiceActivator 做了一个桥接,将指定 Channel 的异常错误转到本地方法里。

@ServiceActivator(inputChannel = "request-coupon-topic.add-coupon-group.errors")
public void requestCouponFallback(ErrorMessage errorMessage) throws Exception {
    log.info("consumer error: {}", errorMessage);
    // 实现自己的逻辑
}

在这段代码中,inputChannel 属性的值是由三部分构成的,它的格式是:..errors。通过 topic 和 group 指定了当前的 inputChannel 是来自于哪个消息队列和分组。

对于一些非常重要的消息驱动场景,如果重试几次还是失败,那么你就可以在异常降级方法里接入通知服务,将情况告知到具体的团队。比如在商品批量改价的场景中,如果价格更新失败,那么很有可能导致线上资损,这边方案是在降级逻辑里接入钉钉接口,把告警消息推送到指定群,通知相关团队尽快做人工介入。

降级逻辑处理完之后,这个原始的 Message 怎么办呢?如果你想要保留这条出错的 Message,那你可以选择将它发送到另一个 Queue 里。待技术团队将异常情况排除之后,你可以选择在未来的某一个时刻将 Queue 里的消息重新丢回到正常的队列中,让消费者重新处理。当然了,你也可以声明一个消费者,专门用来处理这个 Queue 里的消息。

这个特殊的 Queue 就叫做死信队列,它是那些几经重试彻底没救的消息的最终归宿。接下来了解一下怎么去配置死信交换机。

③ 配置死信队列

要触发死信队列很简单,你只要在刚才的降级方法里抛出一个 RuntimeException 就可以了。如果你没有设置降级方法,但最后一次重试抛出了异常,消息也会被移送到死信队列。

在配置死信队列之前,可以先安装两个 RabbitMQ 的插件,分别是 rabbitmq_shovel 和 rabbitmq_shovel_management。这两个插件是用来做消息移动的,让我们可以将死信队列的消息移动到其它正常队列重新消费。

rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management

这两个插件已经预装在了 RabbitMQ 中,只是处于未开启的状态,你可以在命令行执行上面这两行命令,开启插件,完事儿后记得重启 RabbitMQ。

接下来就以 deleteCoupon 这个场景为例,配置一个死信队列。如果用户想要删除一个不存在的优惠券,后台服务就会抛出一个异常,用它来演示死信队列再合适不过了。设置死信队列的第一步就是在配置文件中将消费者所对应的 Queue 绑定到死信交换机上,你可以参考下面这段代码。

spring:
  cloud:
    stream:
      rabbit:
        bindings:
          deleteCoupon-in-0:
            consumer:
              auto-bind-dlq: true

因为我们底层的消息组件是 RabbitMQ,所以这段配置被添加到了 spring.cloud.stream.rabbit 路径下。对应的 Consumer 信道上设置了 auto-bind-dlq=true,开启了死信队列的功能。

理论上到这里你就可以启动项目验证死信队列的功能了,不过呢,如果你没有更换消息队列的名称,那么在程序尝试向死信队列插入数据的时候,你一定会看到一段报错信息:

channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg ‘x-dead-letter-exchange’ for queue ‘delete-coupon-topic.delete-coupon-group’ in vhost ‘/’: received the value ‘DLX’ of type ‘longstr’ but current is none, class-id=50, method-id=10)

其实是在说当前的队列不具备死信交换机的功能。因为这个队列是一个已经存在的队列,而创建这个队列的时候,我们并没有添加 auto-bind-dlq 参数,以至于它并不具备死信队列的路由功能。

接下来你只需要登录到 RabbitMQ 的控制台,在 Queues 面板下进入队列的详情页,点击“Delete Queue”按钮将队列删除掉。然后重新启动应用程序,这时 Stream 会重新创建一个具备死信路由功能的队列了。

你可以看到,图中第一个队列的 Features 标签中多了两个 Tag,分别是 DLX 和 DLK,说明当前队列已经具备了将失败消息路由到死信队列的能力了。其中 DLX 是死信交换机,它根据 Routing Key(即 DLK)将消息路由到死信队列。

第二个队列的名称和第一个队列几乎一样,唯一区别就是末尾多了一个.dlq,这个 dlq 就是死信队列的标志,说明第二个队列是第一个队列的死信队列。

你可以在本地发起几个方法调用,尝试删除压根不存在的优惠券,这时你就会从 RabbitMQ 控制台中发现一个现象,在每次调用失败后,死信队列的消息数量都会自动加一,这就说明整套死信队列方案配置成功。

使用死信队列的一个好处就是,它可以保留原始的消息,给技术人员提供一种异常恢复的途径。怎么恢复?很简单,我们刚才安装的 shovel 插件就派上用场了,你只要点击进入死信队列的详情页,找到 Move messages 这个标签页,在 Destination queue 里填上你想要移动到的目标队列,点击 Move messages 就可以了。通常的做法是待故障恢复之后,将死信队列的消息转移到原始的队列进行重新消费。

2.3 通过 RabbitMQ 插件实现延迟消息

平时网购的时候,你一定有过下单之后忘记付款的情况,等到再回过头想起要付款,发现订单已经被关闭了,很多网购流程里都有类似的“订单超时关闭”功能。相类似的功能还有“自动确认收货”,如果在一定时间内买家都没有点击确认收货按钮,那么系统会自动确认收货并且将订单款项打给卖家。

① 安装插件

你需要先打开 RabbitMQ 官网并进入到插件下载页面,在页面中定位到 rabbitmq_delayed_message_exchange 这个插件。

点击插件上的“Releases”链接,你可以看到适配不同 RabbitMQ 版本的延迟消息插件。

接下来,你需要把安装包的后缀名从.ez 改成.zip,然后使用解压缩工具对安装包进行解压。再把解压后的文件复制到 RabbitMQ 安装路径下的 plugins 文件夹。以本地 MAC 环境为例,plugins 目录位于 /usr/local/Cellar/rabbitmq/3.9.8/plugins,你需要根据自己的操作系统和安装路径找到对应的目录。

然后,你需要执行下面这行 rabbitmq-plugins 命令,通过人工的方式启动 rabbitmq_delayed_message_exchange 插件。

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

最后,你只需要重启一下 RabbitMQ 服务器,新安装的插件就可以生效了,接下来我们就可以通过代码落地延迟领劵业务了。实现延迟领券

② 创建生产者

因为延迟消息队列和普通消息队列的类型不同,为了和之前的普通领券接口做个区分,我们今天要声明一个新的生产者和消费者,用来对接延迟消息队列,先从生产者开始创建。

我们依然保持队形,将生产者方法写入 CouponProducer 这个类中,你可以参考一下下面的代码。

在这段代码中,有一个显而易见的不同之处,你会发现我没有直接将 coupon 对象传递给生产者,取而代之的是使用了 MessageBuilder 来构建消息对象,这样做的一个目的是传入一个特殊的 header,那就是 x-delay。它是延迟消息特有的参数,代表了你想让这个消息在 Queue 里延迟多久以后再被消费者处理,x-delay 对应的单位是毫秒,代码中设置的延迟时间是 10 秒。

// 使用延迟消息发送
public void sendCouponInDelay(RequestCoupon coupon) {
    log.info("sent: {}", coupon);
    streamBridge.send(EventConstant.ADD_COUPON_DELAY_EVENT,
            MessageBuilder.withPayload(coupon)
                    .setHeader("x-delay", 10 * 1000)
                    .build());
}

代码中的 ADD_COUPON_DELAY_EVENT 的值是 addCouponDelay-out-0,它是单独为延迟消息队列指定的 function name。

接下来,在 CouponCustomerController 类中声明了一个入口方法,用来对接生产者方法创建延迟消息。

@PostMapping("requestCouponDelayEvent")
public void requestCouponDelayedEvent(@Valid @RequestBody RequestCoupon request) {
    couponProducer.sendCouponInDelay(request);
}

③ 声明消费者

在消费者这一端,延迟消息和普通消息的实现方式并没有任何不同,你可以把下面这段代码加入到 CouponConsumer 类中。

@Bean
public Consumer<RequestCoupon> addCouponDelay() {
    return request -> {
        log.info("received: {}", request);
        customerService.requestCoupon(request);
    };
}

你需要留意一下消费者的方法名称,一定要保证这里的方法名和配置文件中的 function name 保持完全的一致。消费者创建完成之后,我们最后还需要对配置文件做一些修改。

④ 修改配置文件

这一步中我们需要做的就是把生产者和消费者添加到 application.yml 文件中,你可以参考下面这段代码。

spring:
  cloud:
    stream:
      bindings:
        # 延迟发券 - producer
        addCouponDelay-out-0:
          destination: request-coupon-delayed-topic
          content-type: application/json
          binder: my-rabbit
        # 延迟发券 - Consumer
        addCouponDelay-in-0:
          destination: request-coupon-delayed-topic
          content-type: application/json
          # 消费组,同一个组内只能被消费一次
          group: add-coupon-group
          binder: my-rabbit
          consumer:
            # 如果最大尝试次数为1,即不重试
            # 默认是做3次尝试
            max-attempts: 1
      function:
        definition: addCoupon;deleteCoupon;addCouponDelay
      rabbit:
        bindings:
          addCouponDelay-out-0:
            producer:
              delayed-exchange: true
          addCouponDelay-in-0:
            consumer:
              delayed-exchange: true

第一个是 function name 的统一。在 spring.cloud.stream.function.definition 中添加了 addCouponDelay 作为 functiona name,它和 Consumer 方法中声明的 method name 是一致的。

第二个关键点是绑定生产者消费者 Topic。你会发现生产者和消费者端的 destination 属性中声明了一个全新的 Topic,request-coupon-delayed-topic,这样做是为了重新创建一个带有 x-delay-message 功能的交换机。

第三个关键点是声明延迟消息功能。在 bindings 节点下面声明的生产者和消费者配置项中,设置了 delayed-exchange=true,这是延迟队列最为关键的一个属性。如果没有设置,那么系统将会创建一个普通的交换机,而不是具有延迟消费功能的交换机。

实现延迟消息功能所需要的全部操作就完成了,你可以启动项目并尝试发送几个请求,来验证消息是否会延迟消费。

如果你登录到 RabbitMQ 控制台查看交换机信息,你会发现我们今天声明的延迟消息交换机(request-coupon-delayed-topic)和第前边声明的常规交换机(request-coupon-topic)之间的不同,延迟交换机的类型是 x-delayed-message,并且带有 DM 功能标签,这代表当前交换机具备延迟消费功能。

总结

利用 RabbitMQ 搭建延迟消息的过程并不复杂,不过当项目中 Topic 多起来的时候,function name 的配置很容易出错。当你和一个遵循“约定大于配置”的框架打交道的时候,经常会因为没有遵循一个不起眼的约定,导致功能不 work,而且排查起来特别困难。可见事物总是相对的,约定大于配置的思想在提高开发效率的同时,也略微抬高了入门成本和异常排查的成本。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/179672.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python内置包Tkinter的重要控件(下)

本文将接着介绍剩下的五个重要的控件&#xff0c;包括Canvas&#xff0c;Messagebox&#xff0c;Listbox&#xff0c;Checkbutton&#xff0c;Radiobutton。 目录 前言 控件 1. Canvas 2. Messagebox 3. Listbox 4. Radiobutton 5. Checkbutton 总结 前言 包括但不…

VBA提高篇_08 数据源类型判断 / 四舍五入

文章目录数据类型操作1. 数据类型判断2.数据类型转换2.1转换函数2.2 关于小数数据类型的四舍五入2.2.1 银行家舍入法2.2.2 Round()函数2.2.3 Int()函数数据类型操作 1. 数据类型判断 IsDate() 是否是日期类型 IsNumeric() 是否是数值类型 TypeName(x) 返回x 的数据类型的名称…

移动端特点和flex布局

移动端特点和flex布局移动端特点物理分辨率和逻辑分辨率视口视口标签二倍图百分比布局flex布局主轴对齐方式侧轴对齐方式伸缩比圣杯布局移动端特点 PC端/移动端不同 PC端 屏幕大&#xff0c;网页固定版心浏览器繁多&#xff0c;更多考虑兼容性问题。&#xff08;布局&#xf…

Gin+Vite实现单图上传

前言 参考文献&#xff1a;https://blog.csdn.net/heian_99/article/details/122447855 案例目的&#xff1a;实现前端上传图片并显示&#xff0c;后端保存图片&#xff1b; 技术&#xff1a;elementplus、axios、vue3、vite、gin 实现原理&#xff1a; 前端请求对应后端接口…

连通性1(Tarjan 理论版)

目录 一、无向图割点、桥、双连通分量 Tarjan 算法求割点和桥&#xff08;割边&#xff09; “割点”代码 边双和点双连通分量 边双连通分量 和 点双连通分量 的缩点 二、有向图强连通分量 1.有向图的弱连通与强连通 2.强连通分量 Kosaraju算法 Tarjan 算法&#xff08…

读书笔记:Python绘制三维图像 ← 斋藤康毅

下文给出了绘制函数 的 Python 代码。 很显然&#xff0c;这是一个三维图像。【绘制三维图像的Python代码】 import numpy as np import matplotlib.pyplot as plt from mpl_toolkits.mplot3d import Axes3Dfigplt.figure() axAxes3D(fig) x1np.arange(-3.0, 3.0, 0.1) x2np.…

python刷题-关于日期、正则表达式的题

目录标题1、计算日期范围内的所有日期2、将Unix时间戳转换为格式化日期3、计算日期数据周同比4、正则表达式判断字符串是否是日期5、从文本中提取手机号码 --正则表达式6、批量提取网页上的手机号码7、自动提取电子邮箱地址8、验证用户密码是否规范-re.findall9、提取商品价格1…

ELK简介

什么是ELKE: Elasticsearch全文搜索引擎L: logstash日志采集工具K: kibana ES的可视化工具ELK是当今业界非常流行的日志采集保存和查询的系统我们编写的程序,会有很多日志信息,但是日志信息的保存和查询是一个问题IDEA控制台是一个临时显示的位置,我们可以将它保存在文件中但是…

Jetpack架构组件库:Room

Room Room是一款轻量级orm数据库&#xff0c;本质上是一个基于SQLite之上的抽象层。它通过注解的方式提供相关功能&#xff0c;编译时自动生成实现Impl&#xff0c;相比纯 SQLite 的API使用方式更加简单。另外一个相比于SQLite API的优势是&#xff1a;它会在编译时检查 SQL 语…

SpringBoot+Vue项目在线视频教育平台

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;springboot JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7/8.0 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.3.9 浏…

网络原理之HTTP/HTTPS、TCP、IP四层协议栈

文章目录一、应用层&#xff08;一&#xff09;xml协议&#xff08;二&#xff09;json协议&#xff08;三&#xff09;protobuffer协议&#xff08;四&#xff09;HTTP协议1. 抓包工具&#xff0c;fiddler2. HTTP报文格式3. HTTP请求(Request)&#xff08;1&#xff09;URL基本…

【VisualBasicApplication】Excel编程

VBAExcel的宏与VBA宏的录制宏的启动运行快捷键运行宏&#xff1a;使用Excel对象运行宏*VBA的数据类型字符串&#xff08;String&#xff09;整形&#xff08;Integer&#xff09;和长整形&#xff08;Long&#xff09;单精度浮点型&#xff08;Single&#xff09;和双精度浮点型…

3.mysql查询必备sql语句

文章目录1.条件查询 where2. 通配符与模糊查询3. 映射4. 排序 order_by5. 取部分 limit 和offset6. 分组 group by7.左右连表 left outer join ... on8. 联合查询 union1.条件查询 where 表内容&#xff1a; import pymysqlconn pymysql.connect(host127.0.0.1,port3306,u…

向QAbstractItemView子类如:QTreeView、QTableView等子项单元格插入窗体小部件的功能实现(第2种方法)

1.前言工作中经常会遇到这样的需求&#xff1a;向QAbstractItemView子类如QTreeView、QTableView单元格插入窗体小部件&#xff0c;如&#xff1a;进度条、按钮、单行编辑框等。下面链接的系列博文就是讲解如何实现该功能的。《向QAbstractItemView子类如:QTreeView、QTableVie…

LeetCode 2500. 删除每行中的最大值

给你一个 m x n 大小的矩阵 grid &#xff0c;由若干正整数组成。 执行下述操作&#xff0c;直到 grid 变为空矩阵&#xff1a; 从每一行删除值最大的元素。如果存在多个这样的值&#xff0c;删除其中任何一个。 将删除元素中的最大值与答案相加。 注意 每执行一次操作&#…

行为型模式-状态模式

1.概述 【例】通过按钮来控制一个电梯的状态&#xff0c;一个电梯有开门状态&#xff0c;关门状态&#xff0c;停止状态&#xff0c;运行状态。每一种状态改变&#xff0c;都有可能要根据其他状态来更新处理。例如&#xff0c;如果电梯门现在处于运行时状态&#xff0c;就不能…

时序数据处理中的拟合问题

对于深度学习或机器学习模型而言,我们不仅要求它对训练数据集有很好的拟合(训练误差),同时也希望它可以对未知数据集(测试集)有很好的拟合结果(泛化能力),所产生的测试误差被称为泛化误差。度量泛化能力的好坏,最直观的表现就是模型的过拟合(overfitting)和欠拟合(…

一起Talk Android吧(第四百七十五回:渐变类视图动画)

文章目录使用方法属性介绍示例代码共用属性各位看官们大家好&#xff0c;上一回中咱们说的例子是"如何使用视图动画",这一回中咱们说的例子是"渐变类视图动画"。闲话休提&#xff0c;言归正转&#xff0c;让我们一起Talk Android吧&#xff01; 看官们&am…

移动web动画

移动web动画动画动画属性鼠标经过暂停动画多组动画鼠标经过暂停动画多组动画动画 动画最大的特点可以不用鼠标触发&#xff0c;自动的&#xff0c;反复的执行某些动画。 动画使用分为定义和调用&#xff1a; 定义&#xff1a; /* 1. 定义的动画 */ keyframes dance {from {tr…

恶意代码分析实战 12 对抗反汇编

12.1 Lab15-01 问题 这个二进制程序中使用了何种对抗反汇编技术&#xff1f; 首先&#xff0c;使用IDA载入该文件。 我们可以看到这个程序在地址0040100E处存在一个对抗反汇编技术的痕迹。 eax总是被置为零&#xff0c;jz跳转总是被执行。所以我们认为这一行是假冒的call指…