Kafka - 消息乱序问题的常见解决方案和实现

news2024/12/15 14:01:24

文章目录

  • 概述
  • 一、MQ消息乱序问题分析
    • 1.1 相同topic内的消息乱序
    • 1.2 不同topic的消息乱序
  • 二、解决方案
    • 方案一: 顺序消息
      • Kafka
        • 1. Kafka 顺序消息的实现
          • 1.1 生产者:确保同一业务主键的消息发送到同一个分区
          • 1.2 消费者:顺序消费消息
        • 2. Kafka 顺序消息实现的局限性
        • 3. 小结
      • RocketMQ
        • 1. 使用 RocketMQ 实现顺序消费
          • 1.1 生产者:发送顺序消息
          • 1.2 消费者:顺序消费消息
        • 2. RocketMQ 顺序消息的局限性
        • 3. 小结
    • 方案二: 前置检测(Pre-check)
      • 前置检测的方案
      • 方案1: 使用辅助表进行前置检测
        • 1.1 方案设计
        • 1.2 数据库表设计
        • 1.3 消费者前置检测代码实现
      • 方案2: 使用序列号/时间戳进行顺序检查
        • 2.1 方案设计
        • 2.2 消费者前置检测代码实现
      • 3. 小结
    • 方案三: 状态机
      • 1. 状态机的设计思路
      • 2. 状态机的实现步骤
      • 3. 设计与实现
      • 3.1 状态机设计
        • 3.1.1 定义状态
        • 3.1.2 定义事件
        • 3.1.3 状态机逻辑
        • 3.1.4 使用状态机处理消息
      • 4. 运行流程
      • 5. 小结
    • 监控与报警
      • 伪实现
  • 总结

在这里插入图片描述


概述

在分布式系统中,消息队列(MQ)作为实现系统解耦和异步通信的重要工具,广泛应用于各种业务场景。然而,消息消费时出现的乱序问题,常常会对业务逻辑的正确执行和系统稳定性产生不良影响。

接下来我们将详细探讨MQ消息乱序问题的根源,并提供一系列在实际应用中可行的解决方案,包括顺序消息、前置检测、状态机等方式

在这里插入图片描述

一、MQ消息乱序问题分析

1.1 相同topic内的消息乱序

  • 并发消费:为了提高消息处理吞吐量,通常会配置多个消费者实例来并发消费同一个队列中的消息。然而,由于消费者实例的性能差异,可能导致消息的消费顺序与发送顺序不一致。
  • 消息分区:MQ系统通常采用分区化设计,当同一业务逻辑的消息分发到不同的分区时,可能出现乱序。
  • 网络延迟与抖动:消息在传输过程中可能会受到网络延迟和抖动的影响,导致消息到达消费者端的顺序与发送顺序不一致。
  • 消息重试与故障恢复:当消费者处理消息失败或出现故障时,重试机制或故障恢复操作不当,也可能导致消息乱序。

1.2 不同topic的消息乱序

例如,系统A在01:00时向TopicA发送了消息msgA-01:00,而系统B在01:01时向TopicB发送了消息msgB-01:01。消费者无法预设msgA-01:00必然先于msgB-01:01被接收。消息系统中的分区策略、消费者的处理能力、网络等因素共同导致无法确保消息遵循严格的先进先出(FIFO)原则。


二、解决方案

为了应对消息乱序问题,有几种常见的解决方案,包括顺序消息、前置检测、状态机等。

方案一: 顺序消息

顺序消息是通过确保同一业务主键的消息发送到同一分区,从而保证消息的顺序性

Kafka

Kafka 为例,虽然它不保证全局消息顺序,但可以通过合理的分区策略和消息键来确保消息的局部顺序性。

下面是使用 Kafka 作为消息队列(MQ)时,如何实现顺序消息的解决方案。通过使用 Kafka 的分区策略和消息键(key),可以确保同一业务主键的消息发送到同一个分区,从而保证消息的顺序性。

1. Kafka 顺序消息的实现
1.1 生产者:确保同一业务主键的消息发送到同一个分区

通过指定消息的 key,Kafka 会确保具有相同 key 的消息发送到同一个分区。这样,即使多个消费者并行消费,也能保证消息在同一个分区内的顺序。

生产者代码实现

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class OrderProducer {

    private final KafkaProducer<String, String> producer;
    private final String topic;

    public OrderProducer(String topic) {
        this.topic = topic;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", StringSerializer.class.getName());

        this.producer = new KafkaProducer<>(properties);
    }

    public void sendOrderMessage(String orderId, String orderMessage) {
        // 使用订单ID作为消息的 key,以确保同一订单的消息发送到同一个分区
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, orderId, orderMessage);
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                exception.printStackTrace();
            } else {
                System.out.println("Message sent: " + metadata);
            }
        });
    }

    public void close() {
        producer.close();
    }

    public static void main(String[] args) {
        OrderProducer orderProducer = new OrderProducer("order-topic");

        // 发送顺序消息,确保同一订单的消息被发送到同一分区
        orderProducer.sendOrderMessage("order123", "Order Created");
        orderProducer.sendOrderMessage("order123", "Order Paid");
        orderProducer.sendOrderMessage("order123", "Order Shipped");

        // 发送另一个订单的消息
        orderProducer.sendOrderMessage("order456", "Order Created");
        orderProducer.sendOrderMessage("order456", "Order Paid");

        orderProducer.close();
    }
}
  • 在生产者端,通过 ProducerRecord 发送消息时,设置了消息的 key 为订单 ID(orderId)。Kafka 会使用该 key 来确定消息发送到哪个分区,从而确保同一订单的所有消息都会被发送到同一个分区,保证顺序。
  • producer.send() 方法的回调函数用来处理消息发送的异步结果。

1.2 消费者:顺序消费消息

消费者使用 MessageListenerConsumer 来消费消息。Kafka 默认会根据分区消费顺序保证同一分区内消息的顺序。我们只需要保证同一个业务的消息被路由到同一个分区,消费者就能顺序消费这些消息。

消费者代码实现

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

public class OrderConsumer {

    private final KafkaConsumer<String, String> consumer;
    private final String topic;

    public OrderConsumer(String topic) {
        this.topic = topic;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", "order-consumer-group");
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", StringDeserializer.class.getName());
        properties.put("auto.offset.reset", "earliest");

        this.consumer = new KafkaConsumer<>(properties);
    }

    public void consumeMessages() {
        consumer.subscribe(Collections.singletonList(topic));

        while (true) {
            consumer.poll(1000).forEach(record -> {
                // 处理顺序消息
                System.out.println("Consumed message: " + record.key() + " - " + record.value());
            });
        }
    }

    public void close() {
        consumer.close();
    }

    public static void main(String[] args) {
        OrderConsumer orderConsumer = new OrderConsumer("order-topic");

        // 消费消息,确保同一个订单的消息顺序消费
        orderConsumer.consumeMessages();
    }
}
  • 消费者通过 KafkaConsumer 从指定的 topic 中拉取消息。在这种实现中,消息会按照 Kafka 内部的消费机制被顺序消费。

  • consumer.poll() 方法定期从 Kafka 中拉取消息,并根据 key 分配到相应的分区进行消费。

  • Kafka 的分区是顺序消费的,即每个分区内的消息按照生产者发送的顺序消费。因此,通过确保同一订单的消息使用相同的 key,就能保证同一分区内消息的消费顺序。

2. Kafka 顺序消息实现的局限性
  1. 局部顺序保证:Kafka 只能保证同一分区内的消息顺序,对于跨分区的消息并不保证顺序。因此,确保同一业务的消息发送到同一分区非常关键。
  2. 性能与吞吐量:为了提高系统的吞吐量和并发能力,Kafka 会对 topic 进行分区。分区数过多可能影响顺序性,但可以通过合理设计业务键来平衡性能和顺序性要求。
3. 小结

通过使用 Kafka 的分区和消息键机制,我们可以确保同一业务主键的消息在同一分区内顺序消费。这种方法适用于需要保证顺序性的场景,如订单处理等。生产者确保消息按照业务主键路由到同一分区,消费者则按分区顺序消费消息,从而避免消息乱序的问题。


RocketMQ

在使用 RocketMQ 作为消息队列时,确保消息的顺序消费可以通过 顺序消息Ordered Message)的特性来实现。RocketMQ 支持两种类型的顺序消费:局部顺序(确保同一消息队列内的消息顺序)和 全局顺序(通过单一队列保证全局顺序,但在高并发情况下可能会影响性能)。

1. 使用 RocketMQ 实现顺序消费
1.1 生产者:发送顺序消息

生产者通过指定消息的 key 来确保具有相同 key 的消息被发送到同一个消息队列,从而保证顺序性。RocketMQ 支持发送顺序消息的 API,通过 MessageQueueSelector 来指定消息发送到哪个队列。

生产者代码实现

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.List;

public class OrderProducer {

    private DefaultMQProducer producer;

    public OrderProducer(String groupName) throws Exception {
        // 创建生产者实例
        producer = new DefaultMQProducer(groupName);
        producer.setNamesrvAddr("localhost:9876"); // RocketMQ服务器地址
        producer.start();
    }

    public void sendOrderMessage(String orderId, String orderMessage) throws Exception {
        // 创建消息实例
        Message message = new Message("OrderTopic", "OrderTag", orderMessage.getBytes(RemotingHelper.DEFAULT_CHARSET));

        // 使用订单ID作为消息的key,确保同一订单的消息发送到同一队列
        SendResult sendResult = producer.send(message, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                String orderId = (String) arg;
                int queueIndex = orderId.hashCode() % mqs.size(); // 根据订单ID选择队列
                return mqs.get(queueIndex);
            }
        }, orderId);

        System.out.println("Message sent: " + sendResult);
    }

    public void close() {
        producer.shutdown();
    }

    public static void main(String[] args) throws Exception {
        OrderProducer producer = new OrderProducer("order-group");

        // 发送顺序消息,确保同一订单的消息被发送到同一队列
        producer.sendOrderMessage("order123", "Order Created");
        producer.sendOrderMessage("order123", "Order Paid");
        producer.sendOrderMessage("order123", "Order Shipped");

        producer.sendOrderMessage("order456", "Order Created");
        producer.sendOrderMessage("order456", "Order Paid");

        producer.close();
    }
}
  • 生产者通过 MessageQueueSelector 来确保相同 key 的消息被发送到相同的队列。这里我们使用 orderId 作为消息的 key,通过计算 orderId.hashCode() 来决定消息发送到哪个队列。确保同一个订单的消息发送到同一个队列,从而在消费时保持顺序性。
  • SendResult 会返回发送结果,包括消息发送的状态。

1.2 消费者:顺序消费消息

在消费者端,RocketMQ 提供了 MessageListenerOrderly 接口来实现顺序消费。该接口保证在同一队列内,消息会按照发送的顺序被消费。

消费者代码实现

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.consumer.ConsumeConcurrentlyContext;
import org.apache.rocketmq.common.consumer.ConsumeOrderlyContext;

import java.util.List;

public class OrderConsumer {

    private DefaultMQPushConsumer consumer;

    public OrderConsumer(String groupName) throws Exception {
        // 创建消费者实例
        consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr("localhost:9876"); // RocketMQ服务器地址
        consumer.subscribe("OrderTopic", "*"); // 订阅指定的 topic 和 tag
    }

    public void consumeMessages() throws Exception {
        // 设置顺序消费监听器
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyContext consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    // 消费顺序消息
                    System.out.println("Consumed message: " + new String(msg.getBody()));
                }
                return ConsumeOrderlyContext.SUCCESS;
            }
        });

        consumer.start();
    }

    public void close() {
        consumer.shutdown();
    }

    public static void main(String[] args) throws Exception {
        OrderConsumer consumer = new OrderConsumer("order-consumer-group");

        // 开始消费顺序消息
        consumer.consumeMessages();
    }
}
  • 消费者使用 MessageListenerOrderly 来实现顺序消费。该接口保证了消费者在同一消息队列内按顺序消费消息。

  • 消费者在接收到消息后,会依次消费并输出消息内容。

  • RocketMQ 是基于消息队列的,每个队列内的消息是顺序消费的,即使有多个消费者,也只会有一个消费者消费某个队列的消息。通过将同一 key 的消息发送到同一个队列,可以确保这些消息按照顺序被消费。

  • 需要注意的是,RocketMQ 保证的是 局部顺序,即同一队列内的消息按照发送顺序消费。对于多个队列和多个消费者,只有同一个队列内的消息顺序是保证的。


2. RocketMQ 顺序消息的局限性
  • 局部顺序保证:RocketMQ 只能保证同一队列内的消息顺序,对于多个队列之间的消息没有顺序保证。
  • 性能影响:如果需要保证全局顺序,可能需要将所有消息都发送到同一个队列,这会影响性能,导致吞吐量下降。通常需要在性能和顺序性之间进行权衡。
3. 小结

通过使用 RocketMQMessageQueueSelectorMessageListenerOrderly,我们可以保证同一业务的消息在同一队列内顺序消费。这种方式适用于需要保证顺序的场景,如订单处理、支付等高可靠性的业务系统。生产者通过业务主键选择队列,消费者则顺序消费消息,确保数据一致性和业务流程的正确执行。


方案二: 前置检测(Pre-check)

前置检测(Pre-check)在消息队列消费中,常用于确保消息消费的顺序性,防止因为消息乱序导致的数据不一致或业务错误。其核心思想是在消息消费之前进行验证,确保前置条件满足才继续消费当前消息

在消费者处理消息之前,进行前置条件检查,确保上一条消息已成功消费。这可以通过消息辅助表来实现,或者在消息中附带序列号、时间戳等信息进行验证。

前置检测的方案

前置检测主要包括以下几种常见方法:

  1. 使用辅助表进行状态检查:通过创建一个辅助表(如状态表或消息表),记录消息的状态,消费者可以通过查询该表来验证上一个消息是否已经成功处理,确保消息按顺序消费。

  2. 使用序列号/时间戳进行顺序检查:在消息中包含序列号或时间戳,消费者根据这些信息判断当前消息是否按预期顺序到达。如果不符合顺序,则将当前消息暂时缓存,等待前一个消息处理完成。

  3. 利用死信队列处理无序消息:当消息的顺序不符合预期时,可以将这些消息暂时放入死信队列(DLQ)中,待前置消息消费成功后再重新消费。

方案1: 使用辅助表进行前置检测

假设在处理订单相关的消息时,我们希望确保订单的状态始终按照正确的顺序处理,比如,Order Created 应该在 Order Paid 前消费。

1.1 方案设计
  • 设计一个 order_status 表,记录订单的处理状态。
  • 消费者在处理消息前,查询这个表,确保订单的前置状态已经处理完毕。
  • 消费失败时,可以将消息放入死信队列或重试。
1.2 数据库表设计
CREATE TABLE order_status (
    order_id VARCHAR(255) PRIMARY KEY,
    status VARCHAR(255) NOT NULL,
    update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 状态示例
-- 订单创建:CREATED
-- 订单支付:PAID
-- 订单完成:COMPLETED
1.3 消费者前置检测代码实现
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.consumer.ConsumeOrderlyContext;
import java.util.List;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class OrderConsumerWithPreCheck {

    private static final String DB_URL = "jdbc:mysql://localhost:3306/order_db";
    private static final String DB_USER = "root";
    private static final String DB_PASSWORD = "password";
    
    private DefaultMQPushConsumer consumer;

    public OrderConsumerWithPreCheck(String groupName) throws Exception {
        consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr("localhost:9876"); // RocketMQ服务器地址
        consumer.subscribe("OrderTopic", "*"); // 订阅指定的 topic 和 tag
    }

    // 检查订单状态
    public boolean checkOrderStatus(String orderId, String expectedStatus) throws Exception {
        try (Connection connection = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD)) {
            String query = "SELECT status FROM order_status WHERE order_id = ?";
            try (PreparedStatement statement = connection.prepareStatement(query)) {
                statement.setString(1, orderId);
                ResultSet rs = statement.executeQuery();
                if (rs.next()) {
                    String currentStatus = rs.getString("status");
                    return expectedStatus.equals(currentStatus); // 比对期望状态
                }
            }
        }
        return false; // 订单未找到,默认返回 false
    }

    public void consumeMessages() throws Exception {
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyContext consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    String orderId = msg.getKeys();  // 假设订单ID存储在消息的keys字段
                    String currentStatus = new String(msg.getBody());
                    
                    // 检查前置状态,确保当前状态是顺序的
                    try {
                        if ("OrderCreated".equals(currentStatus)) {
                            if (!checkOrderStatus(orderId, "CREATED")) {
                                System.out.println("Order not created yet, skipping message: " + orderId);
                                continue;  // 如果前置状态不符合,跳过该消息
                            }
                        } else if ("OrderPaid".equals(currentStatus)) {
                            if (!checkOrderStatus(orderId, "PAID")) {
                                System.out.println("Order not paid yet, skipping message: " + orderId);
                                continue;
                            }
                        }

                        // 消费消息逻辑
                        System.out.println("Processing order message: " + orderId + " - " + currentStatus);
                        // 更新状态或其他业务逻辑
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                return ConsumeOrderlyContext.SUCCESS;
            }
        });

        consumer.start();
    }

    public void close() {
        consumer.shutdown();
    }

    public static void main(String[] args) throws Exception {
        OrderConsumerWithPreCheck consumer = new OrderConsumerWithPreCheck("order-consumer-group");
        consumer.consumeMessages();
    }
}

方案2: 使用序列号/时间戳进行顺序检查

在这种方法中,我们为每个消息分配一个 序列号时间戳,并通过对比当前消息的序列号和前一条消息的序列号来确保消息按顺序消费。如果序列号不符合预期,消费者会将该消息缓存,等待前置消息的消费完成。

2.1 方案设计
  • 消息中包含一个 sequenceIdtimestamp 字段。
  • 消费者检查当前消息的 sequenceId,如果当前消息的 sequenceId 小于等于上一个已消费消息的 sequenceId,则跳过当前消息。
2.2 消费者前置检测代码实现
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.consumer.ConsumeOrderlyContext;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class OrderConsumerWithSequenceCheck {

    private DefaultMQPushConsumer consumer;
    private AtomicInteger lastSequenceId = new AtomicInteger(0);  // 记录最后处理的序列号

    public OrderConsumerWithSequenceCheck(String groupName) throws Exception {
        consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr("localhost:9876"); // RocketMQ服务器地址
        consumer.subscribe("OrderTopic", "*"); // 订阅指定的 topic 和 tag
    }

    // 检查消息的序列号,确保顺序性
    public boolean checkSequenceId(int currentSequenceId) {
        return currentSequenceId == lastSequenceId.incrementAndGet();
    }

    public void consumeMessages() throws Exception {
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyContext consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    int sequenceId = Integer.parseInt(new String(msg.getBody())); // 消息中的序列号
                    if (!checkSequenceId(sequenceId)) {
                        System.out.println("Out of order message, skipping message with sequence: " + sequenceId);
                        continue;  // 如果消息序列号不符合顺序,则跳过
                    }

                    // 消费消息逻辑
                    System.out.println("Processing message with sequence ID: " + sequenceId);
                    // 进行相应的业务处理
                }
                return ConsumeOrderlyContext.SUCCESS;
            }
        });

        consumer.start();
    }

    public void close() {
        consumer.shutdown();
    }

    public static void main(String[] args) throws Exception {
        OrderConsumerWithSequenceCheck consumer = new OrderConsumerWithSequenceCheck("order-consumer-group");
        consumer.consumeMessages();
    }
}

3. 小结

前置检测方案的核心是通过验证当前消息的处理条件(如订单的状态或消息的序列号),确保前置条件满足后再继续处理当前消息。此方案能有效防止由于消息乱序导致的数据不一致或业务错误,适用于需要严格保证处理顺序的场景。

  • 数据库检查:通过查询数据库记录来验证消息的处理顺序。
  • 序列号检查:通过消息中的序列号或时间戳验证消息是否按顺序到达。

方案三: 状态机

可以利用状态机来管理消息的消费顺序和状态。状态机的核心思想是定义系统的不同状态,以及触发状态变更的事件,从而确保消息在正确的状态下被处理。

通过引入状态机,我们能够:

  • 通过状态转移机制保证消息按顺序消费。
  • 在状态转移过程中,避免非法的状态变更和消息丢失。

1. 状态机的设计思路

在处理消息时,可以将消息消费的过程视为一系列状态的变换。每个消息会根据其当前状态决定是否可以进行处理。

  1. 定义状态

    • 定义消息消费的不同状态,例如 PENDING(待处理)、PROCESSING(处理中)、PROCESSED(已处理)。
    • 每个消息在处理过程中会从一个状态转移到另一个状态。
  2. 定义事件

    • 每个消息可能触发一个事件,事件可以是消息的到达或者某些外部条件的变化。
    • 通过事件来决定状态的转移。
  3. 处理顺序

    • 确保某些消息必须在特定的顺序下处理。比如,某个状态的消息必须先处理完成,才能处理下一个状态的消息。

2. 状态机的实现步骤

  • 状态定义:使用枚举类(enum)定义消息的状态。
  • 事件定义:根据消息到达的顺序或其他外部条件触发不同的事件。
  • 状态机实现:根据当前状态和事件的触发来决定状态转移。

3. 设计与实现

假设我们有一个订单处理系统,订单的状态可能为以下几种:

  • ORDER_CREATED:订单已创建
  • ORDER_PAID:订单已支付
  • ORDER_SHIPPED:订单已发货
  • ORDER_COMPLETED:订单已完成

我们希望确保消息的消费顺序是按顺序进行的,即订单创建 -> 支付 -> 发货 -> 完成。

3.1 状态机设计

3.1.1 定义状态

首先定义订单状态的枚举类型 OrderState

public enum OrderState {
    ORDER_CREATED,  // 订单已创建
    ORDER_PAID,     // 订单已支付
    ORDER_SHIPPED,  // 订单已发货
    ORDER_COMPLETED // 订单已完成
}
3.1.2 定义事件

根据业务需求,定义事件触发的条件。比如:

  • ORDER_CREATED_EVENT:订单创建事件
  • ORDER_PAID_EVENT:订单支付事件
  • ORDER_SHIPPED_EVENT:订单发货事件
  • ORDER_COMPLETED_EVENT:订单完成事件
3.1.3 状态机逻辑

使用一个状态机类来管理状态的转换。状态机会根据当前状态和触发的事件来进行状态转换。

import java.util.HashMap;
import java.util.Map;

public class OrderStateMachine {

    // 订单状态
    private OrderState currentState;

    // 状态转移规则,基于当前状态和事件决定下一个状态
    private final Map<OrderState, Map<String, OrderState>> transitionTable;

    public OrderStateMachine() {
        // 初始化状态为 ORDER_CREATED
        this.currentState = OrderState.ORDER_CREATED;

        // 初始化状态转移规则
        this.transitionTable = new HashMap<>();

        // 设置转移规则
        // 从 ORDER_CREATED 可以转到 ORDER_PAID
        addTransition(OrderState.ORDER_CREATED, "ORDER_CREATED_EVENT", OrderState.ORDER_PAID);

        // 从 ORDER_PAID 可以转到 ORDER_SHIPPED
        addTransition(OrderState.ORDER_PAID, "ORDER_PAID_EVENT", OrderState.ORDER_SHIPPED);

        // 从 ORDER_SHIPPED 可以转到 ORDER_COMPLETED
        addTransition(OrderState.ORDER_SHIPPED, "ORDER_SHIPPED_EVENT", OrderState.ORDER_COMPLETED);
    }

    // 添加状态转换规则
    private void addTransition(OrderState fromState, String event, OrderState toState) {
        transitionTable.putIfAbsent(fromState, new HashMap<>());
        transitionTable.get(fromState).put(event, toState);
    }

    // 处理事件并转换状态
    public boolean handleEvent(String event) {
        Map<String, OrderState> transitions = transitionTable.get(currentState);
        if (transitions != null && transitions.containsKey(event)) {
            OrderState nextState = transitions.get(event);
            System.out.println("State transition: " + currentState + " -> " + nextState);
            this.currentState = nextState; // 执行状态转移
            return true;
        } else {
            System.out.println("Invalid event for the current state: " + currentState);
            return false;
        }
    }

    // 获取当前状态
    public OrderState getCurrentState() {
        return currentState;
    }
}
3.1.4 使用状态机处理消息

假设我们在消息队列中有不同的订单消息,需要按顺序消费。我们将消费者与状态机结合使用,确保消息按照正确的顺序消费。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.consumer.ConsumeOrderlyContext;

import java.util.List;

public class OrderConsumerWithStateMachine {

    private static final String TOPIC = "OrderTopic";
    private static final String GROUP = "OrderConsumerGroup";

    private DefaultMQPushConsumer consumer;
    private OrderStateMachine stateMachine;

    public OrderConsumerWithStateMachine() {
        consumer = new DefaultMQPushConsumer(GROUP);
        stateMachine = new OrderStateMachine();

        try {
            consumer.setNamesrvAddr("localhost:9876");
            consumer.subscribe(TOPIC, "*");
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyContext consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    for (MessageExt msg : msgs) {
                        String event = new String(msg.getBody());
                        System.out.println("Received message: " + event);

                        // 根据消息的内容触发状态机事件
                        if ("ORDER_CREATED_EVENT".equals(event)) {
                            stateMachine.handleEvent("ORDER_CREATED_EVENT");
                        } else if ("ORDER_PAID_EVENT".equals(event)) {
                            stateMachine.handleEvent("ORDER_PAID_EVENT");
                        } else if ("ORDER_SHIPPED_EVENT".equals(event)) {
                            stateMachine.handleEvent("ORDER_SHIPPED_EVENT");
                        } else if ("ORDER_COMPLETED_EVENT".equals(event)) {
                            stateMachine.handleEvent("ORDER_COMPLETED_EVENT");
                        }

                        System.out.println("Current state: " + stateMachine.getCurrentState());
                    }
                    return ConsumeOrderlyContext.SUCCESS;
                }
            });

            consumer.start();
            System.out.println("Order consumer started");

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new OrderConsumerWithStateMachine();
    }
}

4. 运行流程

  1. 消费者会根据事件(如 ORDER_CREATED_EVENT, ORDER_PAID_EVENT 等)处理消息。
  2. 消费者会触发状态机,状态机会根据当前状态和事件来进行状态转换。
  3. 如果消息的顺序不正确(例如 ORDER_PAID_EVENTORDER_CREATED_EVENT 之前到达),状态机会拒绝处理,并打印 Invalid event for the current state

5. 小结

  • 状态机可以帮助管理消息的消费顺序,确保在处理消息时遵循正确的流程和业务逻辑。
  • 通过定义状态和事件,状态机提供了一个清晰的框架来管理复杂的消息处理过程。
  • 结合消息队列,状态机可以有效地控制消息的顺序消费,避免乱序带来的问题。

监控与报警

建立系统的监控和报警机制,及时发现并处理消息错乱等异常情况。通过设定阈值或检测规则,监控系统的消息流转,确保及时响应并纠正问题。

  • 定期监控消息队列的消费进度,若发现消费滞后或消息顺序异常,自动报警。
  • 通过日志和统计信息,捕获异常并自动触发处理流程。

伪实现

public class MessageMonitor {
    private static final Logger logger = LoggerFactory.getLogger(MessageMonitor.class);

    public void monitorMessageQueue() {
        // 假设有一个队列监控机制
        boolean isOutOfOrder = checkMessageOrder();

        if (isOutOfOrder) {
            logger.error("Message order error detected, triggering alert!");
            // 触发报警或采取恢复措施
        }
    }

    private boolean checkMessageOrder() {
        // 检查消息顺序是否正常
        return false; // 假设没有乱序
    }
}

总结

MQ消息乱序是分布式系统中常见的挑战,直接影响到系统的稳定性和业务一致性。我们可以通过顺序消息、前置检测、状态机等解决方案, 保证消息的顺序性,提高系统的可靠性和用户体验。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2259962.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[MoeCTF 2021]unserialize

[广东强网杯 2021 团队组]欢迎参加强网杯 这题简单&#xff0c;flag直接写在脸上 NSSCTF {Wec10m3_to_QwbCtF} [MoeCTF 2021]unserialize <?phpclass entrance {public $start;function __construct($start){// 构造函数初始化 $start 属性$this->start $start;}fun…

舌头分割数据集labelme格式2557张1类别

数据集格式&#xff1a;labelme格式(不包含mask文件&#xff0c;仅仅包含jpg图片和对应的json文件) 图片数量(jpg文件个数)&#xff1a;2557 标注数量(json文件个数)&#xff1a;2557 标注类别数&#xff1a;1 标注类别名称:["tongue"] 每个类别标注的框数&#xff1…

回归预测 | Matlab实现基于BiLSTM-Adaboost双向长短期记忆神经网络结合Adaboost集成学习回归预测

目录 效果一览基本介绍模型设计程序设计参考资料效果一览 基本介绍 回归预测 | Matlab实现基于BiLSTM-Adaboost双向长短期记忆神经网络结合Adaboost集成学习回归预测 模型设计 基于BiLSTM-Adaboost的回归预测模型结合了双向长短期记忆神经网络(BiLSTM)和Adaboost集成学习的…

Unity学习笔记(二)如何制作角色动画

前言 本文为Udemy课程The Ultimate Guide to Creating an RPG Game in Unity学习笔记 创建一个角色 我们的目的是创建一个可移动、跳跃、冲刺等动作的角色 需要的组件&#xff1a;Rigidbody&#xff08;用于创建物理规则&#xff09;、Collider&#xff08;用于检测碰撞&am…

嵌入式入门Day30

IO Day5 线程相关函数pthread_createpthread_selfpthread_exitpthread_join\pthread_detachpthread_cancelpthread_setcancelstatepthread_setcanceltype 作业 线程 线程是轻量化的进程&#xff0c;一个进程内可以有多个线程&#xff0c;至少包含一个线程&#xff08;主线程&a…

【Ubuntu】双硬盘安装双系统 Windows 和 Ubuntu

【Ubuntu】双硬盘安装双系统 Windows 和 Ubuntu 1 安装顺序2 Ubutnu 20.042.1 准备工作2.2 自定义分区2.3 遇到的一些问题 1 安装顺序 我选择先在一块 SSD 上安装 Windows 再在另一块 SSD 上安装 Ubuntu&#xff0c;建议先安装 Windows 2 Ubutnu 20.04 2.1 准备工作 制作启…

【Qt】QWidget中的常见属性及其功能(一)

目录 一、 enabled 例子&#xff1a; 二、geometry 例子&#xff1a; window fram 例子 &#xff1a; 四、windowTiltle 五、windowIcon 例子&#xff1a; qrc机制 创建qrc文件 例子&#xff1a; qt中的很多内置类都是继承自QWidget的&#xff0c;因此熟悉QWidget的…

iOS swift开发系列 -- tabbar问题总结

1.单视图如何改为tabbar&#xff0c;以便显示2个标签页 右上角➕&#xff0c;输入tabbar 找到控件&#xff0c;然后选中&#xff0c;把entrypoint移动到tabbar控件 2.改成tabbar&#xff0c;生成两个item&#xff0c;配置各自视图后&#xff0c;启动发现报错 Thread 1: “-[p…

Muduo网络库解析--网络模块(2)

前文 重写Muduo库实现核心模块的Git仓库 注&#xff1a;本文将重点剖析 Muduo 网络库的核心框架&#xff0c;深入探讨作者精妙的代码设计思路&#xff0c;并针对核心代码部分进行重写&#xff0c;将原本依赖 boost 的实现替换为原生的 C11 语法。需要说明的是&#xff0c;本文…

电脑怎么设置通电自动开机(工控机)

操作系统&#xff1a;win10 第一步&#xff0c;电脑开机时按del键进入bios页面。 第二步&#xff0c;选择advanced下的IT8712 Super IO Configuration 第三步&#xff0c;找到Auto Power On&#xff0c;将其从Power off设置为Power On 第四步&#xff0c;F10保存&#xff0c;大…

如何对小型固定翼无人机进行最优的路径跟随控制?

控制架构 文章继续采用的是 ULTRA-Extra无人机&#xff0c;相关参数如下&#xff1a; 这里用于guidance law的无人机运动学模型为&#xff1a; { x ˙ p V a cos ⁡ γ cos ⁡ χ V w cos ⁡ γ w cos ⁡ χ w y ˙ p V a cos ⁡ γ sin ⁡ χ V w cos ⁡ γ w sin ⁡ χ…

基于Redis实现令牌桶算法

基于Redis实现令牌桶算法 令牌桶算法算法流程图优点缺点 实现其它限流算法 令牌桶算法 令牌桶是一种用于分组交换和电信网络的算法。它可用于检查数据包形式的数据传输是否符合定义的带宽和突发性限制&#xff08;流量不均匀或变化的衡量标准&#xff09;。它还可以用作调度算…

学习threejs,局部纹理刷新,实现图片分块加载

&#x1f468;‍⚕️ 主页&#xff1a; gis分享者 &#x1f468;‍⚕️ 感谢各位大佬 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍⚕️ 收录于专栏&#xff1a;threejs gis工程师 文章目录 一、&#x1f340;前言1.1 ☘️Texture 贴图 二、&#x1…

超标量处理器设计笔记(10) 寄存器重命名过程的恢复、分发

重命名 寄存器重命名过程的恢复使用 Checkpoint 对 RAT 进行恢复使用 WALK 对 RAT 进行恢复使用 Architecture State 对 RAT 进行恢复总结 分发&#xff08;Dispatch&#xff09; 寄存器重命名过程的恢复 当发生异常、分支预测失败时&#xff0c;指令占用 RAT、ROB 和 Issue …

海康萤石摄像机接入EasyNVR流程:开启RTSP-》萤石视频添加到EasyNVR-》未来支持海康SDK协议添加到EasyNVR

EasyNVR目前支持GB28181、RTSP、ONVIF、RTMP&#xff08;推流&#xff09;这几种协议接入&#xff0c;目前正在增加海康HIKSDK、大华DHSDK等几种SDK的接入&#xff0c;我们今天就介绍一下萤石摄像机怎么通过RTSP接入到EasyNVR。 第一步&#xff1a;萤石摄像机开启 萤石设备默…

Qt编写的文件传输工具

使用QT编写的文件传输工具 文件传输工具通过发送udp广播消息将IP广播给其他开启该程序的局域网机器 文件传输工具 通过发送udp广播消息将IP广播给其他开启该程序的局域网机器 收到的广播消息可以显示在IP地址列表中&#xff0c;点击IP地址可以自动填充到IP地址栏内 选择文件…

【潜意识Java】深入理解 Java 面向对象编程(OOP)

目录 什么是面向对象编程&#xff08;OOP&#xff09;&#xff1f; 1. 封装&#xff08;Encapsulation&#xff09; Java 中的封装 2. 继承&#xff08;Inheritance&#xff09; Java 中的继承 3. 多态&#xff08;Polymorphism&#xff09; Java 中的多态 4. 抽象&…

PWM调节DCDC参数计算原理

1、动态电压频率调整DVFS SOC芯片的核电压、GPU电压、NPU电压、GPU电压等&#xff0c;都会根据性能和实际应用场景来进行电压和频率的调整。 即动态电压频率调整DVFS&#xff08;Dynamic Voltage and Frequency scaling&#xff09;&#xff0c;优化性能和功耗。 比如某SOC在…

OpenCV相关函数

一、二值化函数&#xff08;threshold&#xff09; 功能&#xff1a;将灰度图像转换为二值图像&#xff0c;通常用于图像分割。通过设置阈值&#xff0c;把图像中低于阈值的像素设为0&#xff0c;高于阈值的像素设为1。 参数&#xff1a; src&#xff1a;输入图像。 thresh&a…

bean后处理器的作用

这是beanFactory中常见的一些后处理器&#xff1a; 其中这俩个属于bean后处理器&#xff1a; internalAutowiredAnnotationProcessor解析Autowired、Value internalCommonAnnotationProcessor解析Resource、PostConstruct、PreDestroy Bean后处理器的作用&#xff1a;为Bean…