Spring Boot 整合 RocketMQ 之顺序消息

news2024/10/18 14:03:04

前言:

上一篇我们分享了 Spring Boot 整合 RocketMQ 完成普通消息发送的过程,本篇我们来分享一下 RocketMQ 顺序消息的发送。

RocketMQ 系列文章传送门

RocketMQ 的介绍及核心概念讲解

Spring Boot 整合 RocketMQ 之普通消息

Spring Boot 整合 RocketMQ 之定时/延时消息

RocketMQ 顺序消息的使用场景

典型场景一:撮合交易

在这里插入图片描述
以证券、股票交易撮合场景为例,对于出价相同的交易单,坚持按照先出价先交易的原则,下游处理订单的系统需要严格按照出价顺序来处理订单。

典型场景二:数据实时增量同步

在这里插入图片描述
以数据库变更增量同步场景为例,上游源端数据库按需执行增删改操作,将二进制操作日志作为消息,通过 Apache RocketMQ 传输到下游搜索系统,下游系统按顺序还原消息数据,实现状态数据按序刷新。如果是普通消息则可能会导致状态混乱,和预期操作结果不符,基于顺序消息可以实现下游状态和上游操作结果一致。

RocketMQ 如何保证消息的顺序性?

Producer 保证消息有序性

RocketMQ 只能保证一个队列中的消息是有顺序的,不同队列之间的消息是无法保证顺序的,因此我们要想使用 RocketMQ 完成顺序消息的功能,就必须保证消息发送到同一个队列中,RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。

如需保证消息生产的顺序性,则必须满足以下条件:

  • 单一生产者:消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。
  • 串行发送:RocketMQ 生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。

满足以上条件的生产者,将顺序消息发送至 RocketMQ 后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:

  • 相同消息组的消息按照先后顺序被存储在同一个队列。
  • 不同消息组的消息可以混合在同一个队列中,且不保证连续。

举例如下:
在这里插入图片描述
如上图所示,消息组1和消息组4的消息混合存储在队列1中, Apache RocketMQ 保证消息组1中的消息G1-M1、G1-M2、G1-M3是按发送顺序存储,且消息组4的消息G4-M1、G4-M2也是按顺序存储,但消息组1和消息组4中的消息不涉及顺序关系。

Consumer 保证消息有序性

  • RocketMQ 提供了两种消费模式,有序消费模式 ConsumeMode.ORDERLY 和并发消费模式ConsumeMode.CONCURRENTLY,因此我们的消费端也要使用有序消费模式。
  • RocketMQ 顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理。

了解了 RocketMQ 的顺序消息原理后,接下来我们进入实战部分。

RocketMQ 实现顺序消息发送

RocketMQTemplate 给我们提供了 SendOrderly 方法,来实现发送顺序消息,主要分为三类,如下:

  • syncSendOrderly:发送同步顺序消息,支持超时设置。
  • asyncSendOrderly:发送异步顺序消息,支持超时设置。
  • sendOneWayOrderly:发送单向超时消息。

顺序消息生产者代码如下:

package com.order.service.rocketmq.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;


/**
 * @ClassName: OneWayMessageProducer
 * @Author: zhangyong
 * @Date: 2024/9/27 17:27
 * @Description: 顺序消息发送者
 */
@Slf4j
@Component
public class OrderlyMessageProducer {

    @Autowired
    private RocketMQTemplate rocketMqTemplate;


    /**
     * @date 2024/10/12 15:13
     * @description 单向顺序消息
     */
    public void sendOneWayOrderly(){
        rocketMqTemplate.sendOneWayOrderly("orderly-topic", MessageBuilder.withPayload("单向顺序消息,订单编号:666666 创建").build(), "666666");
        rocketMqTemplate.sendOneWayOrderly("orderly-topic", MessageBuilder.withPayload("单向顺序消息,订单编号:666666 支付").build(), "666666");
        rocketMqTemplate.sendOneWayOrderly("orderly-topic", MessageBuilder.withPayload("单向顺序消息,订单编号:666666 确认收货").build(), "666666");
    }


    /**
     * @date 2024/10/11 15:45
     * @description 同步顺序消息
     */
    public void syncSendOrderly() {
        //hashKey 用来计算决定消息发送到哪个队列  一般是订单 ID 等信息  这里我们模拟订单 ID 发送
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:666666 创建").build(), "666666");
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:666666 支付").build(), "666666");
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:666666 确认收货").build(), "666666");
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:888888 创建").build(), "888888");
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:888888 支付").build(), "888888");
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:888888 确认收货").build(), "888888");
    }

    /**
     * @date 2024/10/11 15:52
     * @description 发送异步顺序消息
     */
    public void asyncSendOrderly() throws InterruptedException {
        //hashKey 用来计算决定消息发送到哪个队列  一般是订单 ID 等信息  这里我们模拟订单 ID 发送
        rocketMqTemplate.asyncSendOrderly("orderly-topic", MessageBuilder.withPayload("异步顺序消息,订单编号:666666 创建").build(), "666666", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("订单编号:666666 创建,异步消息发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("订单编号:666666 创建,异步消息发送失败");
            }
        });
        Thread.sleep(200);
        rocketMqTemplate.asyncSendOrderly("orderly-topic", MessageBuilder.withPayload("异步顺序消息,订单编号:666666 支付").build(), "666666", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("订单编号:666666 支付,异步消息发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("订单编号:666666 支付,异步消息发送失败");
            }
        });
        Thread.sleep(200);
        rocketMqTemplate.asyncSendOrderly("orderly-topic", MessageBuilder.withPayload("异步顺序消息,订单编号:666666 确认收货").build(), "666666", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("订单编号:666666 确认收货,异步消息发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("订单编号:666666 确认收货,异步消息发送失败");
            }
        });
        Thread.sleep(200);
        rocketMqTemplate.asyncSendOrderly("orderly-topic", MessageBuilder.withPayload("异步顺序消息,订单编号:888888 创建").build(), "888888", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("订单编号:888888 创建,异步消息发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("订单编号:888888 创建,异步消息发送失败");
            }
        });
        Thread.sleep(200);
        rocketMqTemplate.asyncSendOrderly("orderly-topic", MessageBuilder.withPayload("异步顺序消息,订单编号:888888 支付").build(), "888888", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("订单编号:888888 支付,异步消息发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("订单编号:888888 支付,异步消息发送失败");
            }
        });
        Thread.sleep(200);
        rocketMqTemplate.asyncSendOrderly("orderly-topic", MessageBuilder.withPayload("异步顺序消息,订单编号:888888 确认收货").build(), "888888", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("订单编号:888888 确认收货,异步消息发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("订单编号:888888 确认收货,异步消息发送失败");
            }
        });
    }


    /**
     * @date 2024/10/11 16:10
     * @description 同步顺序超时消息
     */
    public void syncSendOrderlyTimeOut() {
        //hashKey 用来计算决定消息发送到哪个队列  一般是订单 ID 等信息  这里我们模拟订单 ID 发送
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步带超时顺序消息,订单编号:666666 创建").build(), "666666",200);
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步带超时顺序消息,订单编号:666666 支付").build(), "666666",200);
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步带超时顺序消息,订单编号:666666 确认收货").build(), "666666",200);
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步带超时顺序消息,订单编号:888888 创建").build(), "888888",200);
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步带超时顺序消息,订单编号:888888 支付").build(), "888888",200);
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步带超时顺序消息,订单编号:888888 确认收货").build(), "888888",200);
    }

    /**
     * @date 2024/10/11 15:52
     * @description 发送异步顺序超时消息
     */
    public void asyncSendOrderlyTimeOut() throws InterruptedException {
        //hashKey 用来计算决定消息发送到哪个队列  一般是订单 ID 等信息  这里我们模拟订单 ID 发送
        rocketMqTemplate.asyncSendOrderly("orderly-topic", MessageBuilder.withPayload("异步带超时顺序消息,订单编号:666666 创建").build(), "666666", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("订单编号:666666 创建,异步消息发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("订单编号:666666 创建,异步消息发送失败");
            }
        },200);
        Thread.sleep(200);
        rocketMqTemplate.asyncSendOrderly("orderly-topic", MessageBuilder.withPayload("异步带超时顺序消息,订单编号:666666 支付").build(), "666666", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("订单编号:666666 支付,异步消息发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("订单编号:666666 支付,异步消息发送失败");
            }
        },200);
        Thread.sleep(200);
        rocketMqTemplate.asyncSendOrderly("orderly-topic", MessageBuilder.withPayload("异步带超时顺序消息,订单编号:666666 确认收货").build(), "666666", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("订单编号:666666 确认收货,异步消息发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("订单编号:666666 确认收货,异步消息发送失败");
            }
        },200);
        Thread.sleep(200);
        rocketMqTemplate.asyncSendOrderly("orderly-topic", MessageBuilder.withPayload("异步带超时顺序消息,订单编号:888888 创建").build(), "888888", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("订单编号:888888 创建,异步消息发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("订单编号:888888 创建,异步消息发送失败");
            }
        },200);
        Thread.sleep(200);
        rocketMqTemplate.asyncSendOrderly("orderly-topic", MessageBuilder.withPayload("异步带超时顺序消息,订单编号:888888 支付").build(), "888888", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("订单编号:888888 支付,异步消息发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("订单编号:888888 支付,异步超时消息发送失败");
            }
        },200);
        Thread.sleep(200);
        rocketMqTemplate.asyncSendOrderly("orderly-topic", MessageBuilder.withPayload("异步带超时顺序消息,订单编号:888888 确认收货").build(), "888888", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("订单编号:888888 确认收货,异步超时消息发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("订单编号:888888 确认收货,异步超时消息发送失败");
            }
        },200);
    }



}

生产者代码分析:

我们在代码中分别模拟了单向顺序消息、同步顺序消息、异步顺序消息、同步带超时顺序消息、异步带超时顺序消息的发送,这里提一席一下异步顺序消息代码中的 sleep,因为异步消息是非阻塞的,这里我们又是模拟的业务,因此使用 sleep 操作来保证消费者消息发送的顺序性。

顺序消息消费者代码如下:

顺序消息的消费者代码和普通消息的的消费者代码一样,并没有什么特殊之处。

package com.order.service.rocketmq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "one-way-group", topic = "one-way-topic")
public class OneWayMessageCousumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("单向消息消费成功:{}", message);
    }
}

单向顺序消息验证:

单向顺序消息只是在单向消息的基础上加了一个顺序,单向顺序消息也是不关注消息发送结果的,。

2024-10-12 19:26:54.680  INFO 29852 --- [MessageThread_1] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:单向顺序消息,订单编号:666666 创建
2024-10-12 19:26:54.682  INFO 29852 --- [MessageThread_1] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:单向顺序消息,订单编号:666666 支付
2024-10-12 19:26:54.682  INFO 29852 --- [MessageThread_1] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:单向顺序消息,订单编号:666666 确认收货

消息是按照创建、支付、确认收货的流程消费的,符合预期。

同步顺序消息验证:

同步顺序消息和同步消息一样,是阻塞的。

2024-10-12 19:27:58.806  INFO 29852 --- [MessageThread_2] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:同步顺序消息,订单编号:666666 创建
2024-10-12 19:27:58.808  INFO 29852 --- [MessageThread_3] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:同步顺序消息,订单编号:666666 支付
2024-10-12 19:27:58.810  INFO 29852 --- [MessageThread_4] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:同步顺序消息,订单编号:666666 确认收货
2024-10-12 19:27:58.812  INFO 29852 --- [MessageThread_5] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:同步顺序消息,订单编号:888888 创建
2024-10-12 19:27:58.814  INFO 29852 --- [MessageThread_6] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:同步顺序消息,订单编号:888888 支付
2024-10-12 19:27:58.817  INFO 29852 --- [MessageThread_7] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:同步顺序消息,订单编号:888888 确认收货

消息是按照创建、支付、确认收货的流程消费的,同时模拟的两个订单的消息顺序也没有错乱,符合预期。

异步顺序消息验证:

单向顺序消息只是在单向消息的基础上加了一个顺序,单向顺序消息也是不关注消息发送结果的。

2024-10-12 19:33:35.681  INFO 29852 --- [ublicExecutor_1] c.o.s.r.producer.OrderlyMessageProducer  : 订单编号:666666 创建,异步消息发送成功
2024-10-12 19:33:35.682  INFO 29852 --- [MessageThread_8] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:异步顺序消息,订单编号:666666 创建
2024-10-12 19:33:35.881  INFO 29852 --- [ublicExecutor_2] c.o.s.r.producer.OrderlyMessageProducer  : 订单编号:666666 支付,异步消息发送成功
2024-10-12 19:33:35.882  INFO 29852 --- [MessageThread_9] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:异步顺序消息,订单编号:666666 支付
2024-10-12 19:33:36.082  INFO 29852 --- [ublicExecutor_3] c.o.s.r.producer.OrderlyMessageProducer  : 订单编号:666666 确认收货,异步消息发送成功
2024-10-12 19:33:36.082  INFO 29852 --- [essageThread_10] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:异步顺序消息,订单编号:666666 确认收货
2024-10-12 19:33:36.282  INFO 29852 --- [ublicExecutor_4] c.o.s.r.producer.OrderlyMessageProducer  : 订单编号:888888 创建,异步消息发送成功
2024-10-12 19:33:36.282  INFO 29852 --- [essageThread_11] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:异步顺序消息,订单编号:888888 创建
2024-10-12 19:33:36.483  INFO 29852 --- [ublicExecutor_5] c.o.s.r.producer.OrderlyMessageProducer  : 订单编号:888888 支付,异步消息发送成功
2024-10-12 19:33:36.484  INFO 29852 --- [essageThread_12] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:异步顺序消息,订单编号:888888 支付
2024-10-12 19:33:36.683  INFO 29852 --- [ublicExecutor_6] c.o.s.r.producer.OrderlyMessageProducer  : 订单编号:888888 确认收货,异步消息发送成功
2024-10-12 19:33:36.685  INFO 29852 --- [essageThread_13] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:异步顺序消息,订单编号:888888 确认收货

消息是按照创建、支付、确认收货的流程消费的,同时模拟的两个订单的消息顺序也没有错乱,符合预期。

同步带超时顺序消息验证:

同步带超时顺序消息在保证顺序的同时,可以设置一个超时时间。

2024-10-12 20:05:48.819  INFO 29984 --- [MessageThread_6] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:同步带超时顺序消息,订单编号:666666 创建
2024-10-12 20:05:48.820  INFO 29984 --- [MessageThread_7] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:同步带超时顺序消息,订单编号:666666 支付
2024-10-12 20:05:48.822  INFO 29984 --- [MessageThread_8] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:同步带超时顺序消息,订单编号:666666 确认收货
2024-10-12 20:05:48.824  INFO 29984 --- [MessageThread_9] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:同步带超时顺序消息,订单编号:888888 创建
2024-10-12 20:05:48.826  INFO 29984 --- [essageThread_10] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:同步带超时顺序消息,订单编号:888888 支付
2024-10-12 20:05:48.829  INFO 29984 --- [essageThread_11] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:同步带超时顺序消息,订单编号:888888 确认收货

消息是按照创建、支付、确认收货的流程消费的,同时模拟的两个订单的消息顺序也没有错乱,符合预期。

异步带超时顺序消息验证:

异步带超时顺序消息在保证顺序的同时,可以设置一个超时时间。

2024-10-12 20:08:13.126  INFO 29984 --- [ublicExecutor_1] c.o.s.r.producer.OrderlyMessageProducer  : 订单编号:666666 创建,异步消息发送成功
2024-10-12 20:08:13.127  INFO 29984 --- [essageThread_12] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:异步带超时顺序消息,订单编号:666666 创建
2024-10-12 20:08:13.327  INFO 29984 --- [ublicExecutor_2] c.o.s.r.producer.OrderlyMessageProducer  : 订单编号:666666 支付,异步消息发送成功
2024-10-12 20:08:13.327  INFO 29984 --- [essageThread_13] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:异步带超时顺序消息,订单编号:666666 支付
2024-10-12 20:08:13.528  INFO 29984 --- [ublicExecutor_3] c.o.s.r.producer.OrderlyMessageProducer  : 订单编号:666666 确认收货,异步消息发送成功
2024-10-12 20:08:13.529  INFO 29984 --- [essageThread_14] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:异步带超时顺序消息,订单编号:666666 确认收货
2024-10-12 20:08:13.729  INFO 29984 --- [ublicExecutor_4] c.o.s.r.producer.OrderlyMessageProducer  : 订单编号:888888 创建,异步消息发送成功
2024-10-12 20:08:13.729  INFO 29984 --- [essageThread_15] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:异步带超时顺序消息,订单编号:888888 创建
2024-10-12 20:08:13.929  INFO 29984 --- [ublicExecutor_5] c.o.s.r.producer.OrderlyMessageProducer  : 订单编号:888888 支付,异步消息发送成功
2024-10-12 20:08:13.930  INFO 29984 --- [essageThread_16] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:异步带超时顺序消息,订单编号:888888 支付
2024-10-12 20:08:14.130  INFO 29984 --- [ublicExecutor_6] c.o.s.r.producer.OrderlyMessageProducer  : 订单编号:888888 确认收货,异步超时消息发送成功
2024-10-12 20:08:14.130  INFO 29984 --- [essageThread_17] c.o.s.r.consumer.OrderlyMessageConsumer  : 顺序消息消费成功:异步带超时顺序消息,订单编号:888888 确认收货

RocketMQ 消费端保证消息顺序性揭秘

前面我们分析了消费端 Consumer 保持顺序性,需要使用有序消费模式 ConsumeMode.ORDERLY(不要使用并发消费模式ConsumeMode.CONCURRENTLY),使用有序消费模式只是一个简单的总结,我们想一下在生产环境我们几乎都是多节点部署,那是否每个节点都会去消费顺序消息呢?
如果每个节点都去消费顺序消息是无法保证消息顺序的,RocketMQ 底层使用了分布式锁,消费端在初始化的时候,会为自己申请一个分布式锁,只有锁获取成功了,才可以进行消费,而在接下来的消费过程中才是使用单线程消费队列中的消息,这一步也是需要加锁的,通过加锁来保证一个队列只有一个线程来消费,你以为这就完了吗?为了保证消息的顺序,还要对处理的队列加锁,这次加锁是为了确保在重平衡的过程中消息不会被重复消费。

重平衡问题

当我们的消费者集群重新加入了节点的时候,这个时候就会发生重平衡,重平衡之前的队列可能属于 A 节点来消费,假设重平衡之后该队列属于 B 节点来消费,此时节点 A 就需要把自己加在 Broker 上的锁解锁,让节点 B 来申请锁,如果此时节点 A 拉取了消息正在消费但是还没有提交 ACK,而节点 B 马上又去拉取消息消费,此时就可能会出现重复消费的情况,那如何避免这种情况呢,这时就需要通过处理队列上的锁来判断了,只有当处理队列上的锁释放了,其他节点才可以正常消费该消息,这样就避免了消息重复消费。

下面附上一张极客某专栏的一张流程图,过程描述的比较详细,可供参考:

在这里插入图片描述

RocketMQ 顺序消息的问题?

通过上面的介绍,我们可以很明显的发现一个问题,那就是顺序消息的效率较低,顺序消息的实现是通过多次加锁来实现的,这会明显降低消息吞吐率,同时如果前面的消息阻塞了,会导致后面所有的消息阻塞,因此非必要不要使用 RocketMQ 的顺序消息。

如有不正确的地方欢迎各位指出纠正。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2217825.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大数据-178 Elasticsearch Query - Java API 索引操作 文档操作

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff08;已更完&#xff09;HDFS&#xff08;已更完&#xff09;MapReduce&#xff08;已更完&am…

芯片设计企业ERP软件如何选择更好

在芯片设计这一高科技领域&#xff0c;高效的企业管理成为推动创新与市场响应速度的关键。ERP(企业资源计划)软件作为企业管理的核心工具&#xff0c;其选择直接关系到企业的运营效率与竞争力。那么&#xff0c;芯片设计企业在面对琳琅满目的ERP软件时&#xff0c;如何做出更优…

【HTML + CSS 魔法秀】打造惊艳 3D 旋转卡片

HTML结构 box 类是整个组件的容器。item-wrap 类是每个旋转卡片的包装器&#xff0c;每个都有一个内联样式–i&#xff0c;用于控制动画的延迟。item类是实际的卡片内容&#xff0c;包含一个图片。 <template><div class"box"><div class"item…

Axure横向菜单高级交互

亲爱的小伙伴&#xff0c;在您浏览之前&#xff0c;烦请关注一下&#xff0c;在此深表感谢&#xff01; 课程主题&#xff1a;横向菜单高级交互 主要内容&#xff1a;横向菜单左右拖动、选中效果 应用场景&#xff1a;app横向菜单、pc后台动态区域 案例展示&#xff1a; 演…

ThreadLocal源码详解

目录 Thread、ThreadLocalMap 、ThreadLocal关系 ThreadLocal中的get、Set方法 ThreadLocal 内存泄露问题 Thread、ThreadLocalMap 、ThreadLocal关系 从源码可以看出&#xff1a;Thread类中有成员变量ThreadLocalMap&#xff0c;ThreadLocalMap类中有成员变量Entry[]数组&a…

Spring Cache Caffeine 高性能缓存库

​ Caffeine 背景 Caffeine是一个高性能的Java缓存库&#xff0c;它基于Guava Cache进行了增强&#xff0c;提供了更加出色的缓存体验。Caffeine的主要特点包括&#xff1a; 高性能&#xff1a;Caffeine使用了Java 8最新的StampedLock乐观锁技术&#xff0c;极大地提高了缓存…

buffer/cache内存优化_posix_fadvise_主动释放读缓存cache

1.问题现象 1.htop free命令发现系统 buffer/cache 内存占用高 free -h total used free shared buff/cache available Mem: 61Gi 15Gi 569Mi 1.7Gi 45Gi 43Gi Swap: 30Gi 0.0Ki 30Gi cat /proc/meminfo or grep -E "Buff|Cache" /proc/meminfo Buffers: 370568 kB …

Linux 进程终止和进程等待

目录 0.前言 1. 进程终止 1.1 进程退出的场景 1.2 进程常见退出方法 1.2.1 正常退出 1.2.2 异常退出 2. 进程等待 2.1 进程等待的重要性 2.2 进程等待的方法 2.2.1 wait() 方法 2.2.2 waitpid() 方法 2.3 获取子进程 status 2.4 阻塞等待和非阻塞等待 2.4.1 阻塞等待 2.4.2 非阻…

拼三角问题

欢迎来到杀马特的主页&#xff1a;羑悻的小杀马特.-CSDN博客 目录 一题目&#xff1a; 二思路&#xff1a; 三解答代码&#xff1a; 一题目&#xff1a; 题目链接&#xff1a; 登录—专业IT笔试面试备考平台_牛客网 二思路&#xff1a; 思路&#xff1a;首先明白能组成三角形…

php的echo和print输出语句⑥

在 PHP 中有两个基本的输出方式&#xff1a; echo 和 print。 echo 和 print 区别: echo : 可以输出一个或多个字符串 print : 只允许输出一个字符串。 提示&#xff1a;echo 输出的速度比 print 快&#xff0c; echo 没有返回值&#xff0c;print有返回值1。 <?php …

【赵渝强老师】Oracle的联机重做日志文件与数据写入过程

在Oracle数据库中&#xff0c;一个数据库可以有多个联机重做日志文件&#xff0c;它记录了数据库的变化。例如&#xff0c;当Oracle数据库产生异常时&#xff0c;导致对数据的改变没有及时写入到数据文件中。这时Oracle数据库就会根据联机重做日志文件中的信息来获得数据库的变…

Submariner 服务更新同步测试

测试服务更新同步问题 在集群1 部署 nginx1服务&#xff0c;导出服务&#xff0c;分配的虚拟 IP 为 100.1.255.253 在其他集群检测 serviceimport &#xff0c;可以检测到 nginx1 服务对应的 serviceimport 正常情况下的 serviceexport 如果删除 service 或者 删除 serviceexp…

OpenAI Canvas用户反馈:并不如外界传言般“炸裂”,更不是“AGI的终极交互形态” | LeetTalk Daily...

“LeetTalk Daily”&#xff0c;每日科技前沿&#xff0c;由LeetTools AI精心筛选&#xff0c;为您带来最新鲜、最具洞察力的科技新闻。 Canvas作为一个独立的界面&#xff0c;通过与ChatGPT的结合来提升用户的协作能力和创作效率。尽管用户对其独立性与现有工具的整合存在不同…

闯关leetcode——112. Path Sum

大纲 题目地址内容 解题代码地址 题目 地址 https://github.com/f304646673/leetcode/tree/main/112-Path-Sum 内容 Given the root of a binary tree and an integer targetSum, return true if the tree has a root-to-leaf path such that adding up all the values alo…

OpenCV高级图形用户界面(17)设置一个已经创建的滚动条的最小值函数setTrackbarMin()的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 cv::setTrackbarMin 这个函数的作用就是设置指定窗口中轨迹条的最小位置。这使得开发者能够在程序运行时动态地调整轨迹条的范围&#xff0c;而不…

基于STM32的风速风向传感器设计

引言 本项目设计了一个基于STM32的风速和风向传感器系统&#xff0c;能够通过组合使用旋转式风速传感器和电子罗盘&#xff0c;实时测量风速和风向&#xff0c;并将数据通过显示屏或无线模块发送给用户。该系统适用于气象监测、环境监控、农业自动化等场景&#xff0c;具有准确…

微信好友变顾客,7天成效的秘诀

在如今的社交媒体时代&#xff0c; 微信不仅是沟通工具&#xff0c;更是商业营销的重要平台。很多人拥有大量的微信好友&#xff0c;但成交的客户很少&#xff1f;以下四个有效的社交销售秘诀&#xff0c;帮助你在7天内实现转化。 01保持耐心&#xff0c;合理安排跟进时间 在销…

Springboot 整合 Java DL4J 实现安防监控系统

&#x1f9d1; 博主简介&#xff1a;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/literature?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;精通Java编程&#xff0c;…

【网络安全】-vulnhub靶场-noob

1.靶机下载&#xff1a; https://www.vulnhub.com/entry/noob-1,746/ 得到ova文件导入虚拟机&#xff0c;并打开虚拟机设置&#xff0c;将靶机-Noob与攻击机-kali的网络适配器都改成NAT仅主机模式&#xff0c;确保两台虚拟机在同一网段上。 2.靶机-Noob ip 判断 命令&#x…

[Vue3核心语法] ref、reactive响应式数据

定义: ref用来定义&#xff1a;基本类型数据、对象类型数据&#xff1b; reactive用来定义&#xff1a;对象类型数据。 使用原则: 若需要一个基本类型的响应式数据&#xff0c;必须使用ref。 若需要一个响应式对象&#xff0c;层级不深&#xff0c;ref、reactive都可以。 …