MQ四兄弟:如何保证消息顺序性

news2024/9/30 9:36:52

在当今的分布式系统架构中,消息队列(MQ)是不可或缺的组成部分。它们在确保系统组件之间高效通信方面发挥着关键作用。特别是在金融交易、物流跟踪等对消息处理顺序有严格要求的场景中,消息队列的顺序性保证显得更为重要。接下来,我们将深入探讨RabbitMQ、RocketMQ、Kafka和Pulsar这四个广泛使用的消息队列系统,分析它们是如何确保消息的顺序性,并附上相应的代码示例。

RabbitMQ

RabbitMQ作为一款成熟的开源消息队列,,基于AMQP(Advanced Message Queuing Protocol)协议构建,广泛应用于企业级应用中。虽然RabbitMQ本身并不保证严格的全局顺序性,但可以通过特定的设计模式来实现消息顺序性。

  1. 单一队列和单一消费者模式:确保一个队列只被一个消费者消费,这样可以保证消息按照发送的顺序被处理。因为队列本身就是一个先进先出的结构。
  2. 消息排序:在消息生产者端,为消息添加序列号或时间戳,消费者端根据这些信息对消息进行排序。

以下是一个简单的Java代码片段,展示了如何在RabbitMQ中发送消息。请注意,这个例子没有包含消息排序的逻辑,因为它依赖于具体的业务场景和消息结构。


public class Send {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

创建了一个连接和一个通道,然后声明了一个队列。之后,我们发布了一个简单的消息到队列中。为了保证消息的顺序性,我们需要确保所有消息都是通过同一个通道发送,并且在消费端也是由同一个消费者按顺序接收处理。

RocketMQ

RocketMQ作为阿里巴巴开源的分布式消息队列,在保证消息顺序性方面提供了一种基于MessageQueueSelector的解决方案。其核心思路是将有序的消息写入特定的队列,从而使消费端固定消费某个队列时,就能够按顺序消费消息。

具体来说,RocketMQ中有两个重要概念:

  • Topic: 逻辑上的消息主题
  • MessageQueue: 物理上存储消息的队列

一个Topic包含多个MessageQueue,消息会根据其内容进行哈希计算,分配到不同的MessageQueue中。用户可以通过提供MessageQueueSelector,对特定类型的消息强制分配到同一个MessageQueue,从而保证顺序性。

示例代码:

生产者

// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("nameserver:9876");
// 启动Producer实例
producer.start();
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest", "TagA", "OrderID" + orderId, ("Hello RocketMQ " + i).getBytes());
// 发送有序消息
producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer orderId = (Integer) arg; // 订单ID作为选择器的参数
        int index = orderId % mqs.size(); // 根据订单ID计算MessageQueue索引
        return mqs.get(index); // 返回该索引对应的MessageQueue
    }
}, orderId);

通过上述代码,发送端可以将具有相同订单号的消息发送到同一个MessageQueue。

消费端

 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("unique_group_name");
    consumer.setNamesrvAddr("nameserver:9876");
    consumer.subscribe("TopicTest", "TagA");
    consumer.registerMessageListener(new MessageListenerOrderly() {
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            context.setAutoCommit(true);
            for (MessageExt msg : msgs) {
                System.out.printf("Consumer: %s %n", new String(msg.getBody()));
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });

消费端只需固定消费指定的MessageQueue,即可以保证消息按顺序被消费。

Kafka

Kafka通过Partition(分区)的概念来保证消息的顺序性。同一个Partition中的消息是有序的,但不同Partition之间是无序的。Producer在发送消息时可以指定消息要发送到的分区。Kafka默认提供了基于key的分区策略,确保具有相同key的消息会被发送到同一个分区,从而保证这些消息在这个分区内的顺序性。

以下是一个简单的 Java 代码示例,展示了如何在 Kafka 中发送和消费有序消息:

生产者代码

public class OrderProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 假设我们有10个订单,每个订单的消息需要顺序处理
        for (int orderId = 0; orderId < 10; orderId++) {
            for (int i = 0; i < 5; i++) { // 每个订单发送5条消息
                String message = String.format("Order %d, Message %d", orderId, i);
                producer.send(new ProducerRecord<>("OrderTopic", Integer.toString(orderId), message));
            }
        }

        producer.close();
    }
}

producer.send(new ProducerRecord<>("OrderTopic", Integer.toString(orderId), message));

第二个参数是消息的键(key),这里使用订单ID作为键,确保相同订单ID的消息发送到同一个分区

消费者代码


Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order_consumer_group");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("OrderTopic"));

在生产者代码中,我们使用了相同的 key(即订单ID)来确保消息被发送到同一个 Partition。在消费者代码中,我们订阅了整个 Topic,但由于我们使用了相同的 key 来发送消息,Kafka 会自动将具有相同 key 的消息路由到同一个 Partition,从而保证顺序性。

Pulsar

Apache Pulsar 通过分区主题(Partitioned Topics)来保证消息的顺序性。在Pulsar中,每个分区可以看作是一个独立的消息队列,分区内的消息保持发送顺序。为了确保消息的顺序性,生产者在发送消息时需要指定一个键(Key),Pulsar会根据这个键将消息路由到特定的分区。这样,具有相同键的消息就会被发送到同一个分区,并且按照发送的顺序进行消费。

生产者代码示例:


public class PulsarOrderProducer {
    public static void main(String[] args) throws Exception {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();

        Producer<String> producer = client.newProducer(Schema.STRING)
                .topic("persistent://public/default/my-topic")
                .create();

        for (int i = 0; i < 100; i++) {
            String key = "OrderID" + (i % 10); // 假设OrderID是业务键
            String value = "Message" + i;
            producer.newMessage()
                    .key(key)
                    .value(value)
                    .send();
        }

        producer.close();
        client.close();
    }
}

消费者代码示例:

public class PulsarOrderConsumer {
    public static void main(String[] args) throws Exception {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();

        Consumer<String> consumer = client.newConsumer(Schema.STRING)
                .topic("persistent://public/default/my-topic")
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Exclusive)
                .subscribe();

        while (true) {
            Message<String> msg = consumer.receive();
            try {
                // 处理消息
                System.out.printf("Message with key %s: %s", msg.getKey(), msg.getValue());
                consumer.acknowledge(msg);
            } catch (Exception e) {
                consumer.negativeAcknowledge(msg);
            }
        }
    }
}

在消费者代码中,我们使用了SubscriptionType.Exclusive,使订阅被独占,确保只有一个消费者能够消费分区内的消息,从而保证了消息的顺序性。

总结

尽管RabbitMQ、RocketMQ、Kafka和Pulsar这些消息队列系统虽然在实现细节上有所不同,但它们保证消息顺序性的核心思想都是相似的,即确保具有相同特征的消息被发送到同一队列或分区中,由于队列数据结构本身就是先进先出的结构,因此只需要消费者从该队列按顺序消费,就能够保证消息的有序性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1913404.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机网络——网络层(概念及IP地址划分)

目录 网络层概念 网络层向上层提供的两种服务 虚电路 网络提供数据报服务 虚电路服务与数据报服务的对比 网络层的两个层面 分组传送到路由器的运作 对网络层进行分层 网际协议IP 虚拟互联网络 IP地址 IP地址及其表示方法 IP地址的计算方式 IP地址的结构 …

算法的时间复杂度(C语言)

1.时间复杂度的定义 在计算机科学中&#xff0c;算法的时间复杂度是一个函数&#xff0c;它定量描述了算法的运行时间。一个算法所花费的时间与其中语句的执行次数成正比列&#xff0c;算法中的基本操作的执行次数&#xff0c;为算法的时间复杂度 例1&#xff1a; 计算Func1…

Midjourney 如何使用参考图像来提升图像的准确性和相似度?

&#x1f9d9;&#x1f3fc;图像提示 &#x1f9d9;&#x1f3fc;‍♂️ 您可以使用图像作为提示的一部分来影响作业的构图、样式和颜色。图像提示可以单独使用&#xff0c;也可以与文本提示一起使用 - 尝试组合具有不同样式的图像以获得最令人兴奋的结果。 &#x1f6e0;️实…

合合信息大模型加速器重磅上线,释放智能文档全新可能

目录 0 写在前面1 高速文档解析引擎&#xff1a;拓宽大模型认知边界2 文本嵌入模型acge&#xff1a;克服大模型感知缺陷3 行业赋能&#xff1a;以百川智能为例总结 0 写在前面 随着人工智能技术的飞速发展&#xff0c;大模型以强大的数字处理能力和深度学习能力&#xff0c;不…

XTuner 微调 LLM:1.8B, 部署

扫码立刻参与白嫖A100&#xff0c;书生大模型微调部署学习活动。亲测有效 内容来源&#xff1a;Tutorial/xtuner/personal_assistant_document.md at camp2 InternLM/Tutorial GitHubLLM Tutorial. Contribute to InternLM/Tutorial development by creating an account on G…

帕金森病患者应该如何确定自己每天适宜的饮水量?

帕金森病患者确定每天适宜的饮水量时&#xff0c;应该考虑到药物副作用、运动障碍和便秘等问题。建议的饮水量通常是每天6至8杯水&#xff0c;相当于约2000毫升左右。这个量可以根据个人的体重、气候条件、活动水平以及是否有其他健康问题进行适当调整。 为了科学合理地安排饮水…

【CVPR 2024】GART: Gaussian Articulated Template Models

【CVPR 2024】GART: Gaussian Articulated Template Models 一、前言Abstract1. Introduction2. Related Work3. Method3.1. Template Prior3.2. Shape Appearance Representation with GMM3.3. Motion Representation with Forward Skinning3.4. Reconstruct GART from Monocu…

【笔记】Android V 应用SDK升级适配和问题

说明 随着Google释放的Android版本,系统升级SDK到35,应用也需要升级上去,不然会报错。 Android Studio Jellyfish | 2023.3.1 | Android Developers Android Studio 预览版中的新功能 | Android Developers 当前版本的Android Studio

在超算平台或高性能集群上运行并行程序使用命令mpirun -np ,出现“no active ports detected”

问题&#xff1a; 在超算平台或高性能集群上运行并行程序使用命令mpirun -np &#xff0c;出现“no active ports detected” 具体使用的命令如下&#xff1a; Participant2"Solid" Solver2"linear_elasticity" nprocS4 # jie notes:24# Runecho "…

PCI PTS 硬件安全模块(HSM)模块化安全要求 v5.0

符合条件的 PCI SSC 利益相关者在 30 天的意见征询 (RFC) 期间审查 PCI PTS 硬件安全模块 (HSM) 模块化安全要求 v5.0 草案并提供反馈。 PCI PTS 硬件安全模块(HSM)模块化安全要求 v5.0图 从 7 月 8 日到 8 月 8 日&#xff0c;邀请符合条件的 PCI SSC 利益相关者在 30 天的意见…

Riscv 架构的合规测试

为啥直接关注riscv-arch-test&#xff0c;是因为RISCOF 测试框架使用的是riscv-arch-test 1. The architectural test 架构测试是一个单一的测试&#xff0c;代表了可编译和运行的最小测试代码。它是用汇编代码编写的&#xff0c;其产品是test signature。一个架构测试可能由…

BUG解决:postman可以请求成功,但Python requests请求报403

目录 问题背景 问题定位 问题解决 问题背景 使用Python的requests库对接物联数据的接口之前一直正常运行&#xff0c;昨天突然请求不通了&#xff0c;通过进一步验证发现凡是使用代码调用接口就不通&#xff0c;而使用postman就能调通&#xff0c;请求参数啥的都没变。 接口…

【机器学习】主成分分析(PCA):数据降维的艺术

&#x1f308;个人主页: 鑫宝Code &#x1f525;热门专栏: 闲话杂谈&#xff5c; 炫酷HTML | JavaScript基础 ​&#x1f4ab;个人格言: "如无必要&#xff0c;勿增实体" 文章目录 主成分分析&#xff08;PCA&#xff09;&#xff1a;数据降维的艺术引言PCA的基…

ssm华天计算机面试刷题系统-计算机毕业设计源码22543

摘 要 华天计算机面试刷题系统是一款基于SSM&#xff08;Spring、Spring MVC、MyBatis&#xff09;框架、利用Java编程语言和MySQL数据库&#xff0c;开发的在线学习和测试平台。系统利用SSM框架及前端开发技术&#xff0c;实现了模块化开发和管理&#xff0c;前后端交互以及数…

【数据结构和算法的概念等】

目录 一、数据结构1、数据结构的基本概念2、数据结构的三要素2.1 数据的逻辑结构2.2 数据的存储&#xff08;物理&#xff09;结构2.3 数据的运算 二、算法1、算法概念2、算法的特性及特点3、算法分析 一、数据结构 1、数据结构的基本概念 数据&#xff1a; 是所有能输入到计…

利用SpringBoot+rabbitmq 实现邮件异步发送,保证100%投递成功

在之前的文章中&#xff0c;我们详细介绍了 SpringBoot 整合 mail 实现各类邮件的自动推送服务。 但是这类服务通常不稳定&#xff0c;当出现网络异常的时候&#xff0c;会导致邮件推送失败。 本篇文章将介绍另一种高可靠的服务架构&#xff0c;实现邮件 100% 被投递成功。类…

基于Java中的SSM框架实现水稻朔源信息系统项目【项目源码】

基于Java中的SSM框架实现水稻朔源信息系统演示 SSM框架 SSM框架是基于Spring、SpringMVC以及Mybatis实现的针对JAVA WEB端应用的开发框架&#xff0c;通过SSM框架结构可以实现以上三种框架的优点集合&#xff0c;从而实现更加高效便捷的系统开发和呈现。该框架结构通过Spring框…

红日靶场----(二)2.信息收集

上期我们已经猜解到了MySQL的账号密码。 这期我们开始目录枚举&#xff0c;我们知道目录枚举能不能获得有用的信息&#xff0c;需要强大的字典。 只有字典强大才能精准的爆破到目录及文件&#xff0c;下面我会介绍一个强大的字典文件。 目录枚举之SecLists字典&#xff1a; …

面试题 22:解释 Python 中的成员运算符?

欢迎莅临我的博客 &#x1f49d;&#x1f49d;&#x1f49d;&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…

如何恢复永久删除的婚礼照片

我们的生活就像一本记忆剪贴簿&#xff0c;充满了褪色和模糊的快照。尽管我们想记住事情并留住快乐的回忆&#xff0c;但随着时间的流逝&#xff0c;它们会被冲走。为了避免这种情况并记住这些记忆&#xff0c;我们以照片的形式捕捉瞬间。这有助于缓解和分享那些快乐的时刻。但…