小心,丢失的消息!RocketMQ投递策略帮你解决问题!博学谷狂野架构师

news2025/1/11 21:43:10

RocketMQ消息投递策略

img

  • 作者: 博学谷狂野架构师
  • GitHub:GitHub地址 (有我精心准备的130本电子书PDF)

    只分享干货、不吹水,让我们一起加油!😄

前言

RocketMQ的消息投递分分为两种:一种是生产者往MQ Broker中投递;另外一种则是MQ broker 往消费者 投递(这种投递的说法是从消息传递的角度阐述的,实际上底层是消费者从MQ broker 中Pull拉取的)。本文将从模型的角度来阐述这两种机制。

RocketMQ的消息模型

RocketMQ 的消息模型整体并不复杂,如下图所示:

RocketMQ 消息模型

一个Topic(消息主题)可能对应多个实际的消息队列(MessgeQueue)

在底层实现上,为了提高MQ的可用性和灵活性,一个Topic在实际存储的过程中,采用了多队列的方式,具体形式如上图所示。每个消息队列在使用中应当保证先入先出(FIFO,First In First Out)的方式进行消费。

那么,基于这种模型,就会引申出两个问题:

  • 生产者 在发送相同Topic的消息时,消息体应当被放置到哪一个消息队列(MessageQueue)中?
  • 消费者 在消费消息时,应当从哪些消息队列中拉取消息?

消息的系统间传递时,会跨越不同的网络载体,这会导致消息的传播无法保证其有序请

生产者投递策略

轮询算法投递

默认投递方式:基于Queue队列轮询算法投递

默认情况下,采用了最简单的轮询算法,这种算法有个很好的特性就是,保证每一个Queue队列的消息投递数量尽可能均匀,算法如下图所示:

COPY/**
*  根据 TopicPublishInfo Topic发布信息对象中维护的index,每次选择队列时,都会递增
*  然后根据 index % queueSize 进行取余,达到轮询的效果
*
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        return tpInfo.selectOneMessageQueue(lastBrokerName);
}

/**
*  TopicPublishInfo Topic发布信息对象中
*/
public class TopicPublishInfo {
    //基于线程上下文的计数递增,用于轮询目的
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();


    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                //轮询计算
                int pos = Math.abs(index++) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }

    public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.getAndIncrement();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }
}
代码示例

RocketMQ默认采用轮询投递策略

COPY/**
 * 轮询投递策略
 */
public class PollingProducer {

    public static void main(String[] args) throws Exception {
        //创建一个消息生产者,并设置一个消息生产者组
        DefaultMQProducer producer = new DefaultMQProducer("rocket_test_consumer_group");

        //指定 NameServer 地址
        producer.setNamesrvAddr("127.0.0.1:9876");

        //初始化 Producer,整个应用生命周期内只需要初始化一次
        producer.start();

        for (int i = 0; i < 10; i++) {
            //创建一条消息对象,指定其主题、标签和消息内容
            Message msg = new Message(
                    /* 消息主题名 */
                    "topicTest",
                    /* 消息标签 */
                    "TagA",
                    /* 消息内容 */
                    ("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );

            //发送消息并返回结果
            SendResult sendResult = producer.send(msg);

            System.out.println("product: 发送状态:" + sendResult.getSendStatus() + ",存储queue:" + sendResult.getMessageQueue().getQueueId() + ",消息索引:" + i);
        }

        // 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等
        producer.shutdown();
    }
}

打印结果

COPYproduct: 发送状态:SEND_OK,存储queue:0,消息索引:0
product: 发送状态:SEND_OK,存储queue:1,消息索引:1
product: 发送状态:SEND_OK,存储queue:2,消息索引:2
product: 发送状态:SEND_OK,存储queue:3,消息索引:3
product: 发送状态:SEND_OK,存储queue:0,消息索引:4
product: 发送状态:SEND_OK,存储queue:1,消息索引:5
product: 发送状态:SEND_OK,存储queue:2,消息索引:6
product: 发送状态:SEND_OK,存储queue:3,消息索引:7
product: 发送状态:SEND_OK,存储queue:0,消息索引:8
product: 发送状态:SEND_OK,存储queue:1,消息索引:9

消息投递延迟最小策略

默认投递方式的增强:基于Queue队列轮询算法和消息投递延迟最小的策略投递

默认的投递方式比较简单,但是也暴露了一个问题,就是有些Queue队列可能由于自身数量积压等原因,可能在投递的过程比较长,对于这样的Queue队列会影响后续投递的效果。

基于这种现象,RocketMQ在每发送一个MQ消息后,都会统计一下消息投递的时间延迟,根据这个时间延迟,可以知道往哪些Queue队列投递的速度快。

在这种场景下,会优先使用消息投递延迟最小的策略,如果没有生效,再使用Queue队列轮询的方式。

COPYpublic class MQFaultStrategy {
    /**
     * 根据 TopicPublishInfo 内部维护的index,在每次操作时,都会递增,
     * 然后根据 index % queueList.size(),使用了轮询的基础算法
     *
     */
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {
                // 从queueid 为 0 开始,依次验证broker 是否有效,如果有效
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    //基于index和队列数量取余,确定位置
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

                // 从延迟容错broker列表中挑选一个容错性最好的一个 broker
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                     // 取余挑选其中一个队列
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
          // 取余挑选其中一个队列
            return tpInfo.selectOneMessageQueue();
        }

        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
}

顺序投递策略

上述两种投递方式属于对消息投递的时序性没有要求的场景,这种投递的速度和效率比较高。而在有些场景下,需要保证同类型消息投递和消费的顺序性。

例如,假设现在有TOPIC topicTest,该 Topic下有4个Queue队列,该Topic用于传递订单的状态变迁,假设订单有状态:未支付已支付发货中(处理中)发货成功发货失败

在时序上,生产者从时序上可以生成如下几个消息:

COPY订单T0000001:未支付 --> 订单T0000001:已支付 --> 订单T0000001:发货中(处理中) --> 订单T0000001:发货失败

消息发送到MQ中之后,可能由于轮询投递的原因,消息在MQ的存储可能如下:

消息顺序性

这种情况下,我们希望消费者消费消息的顺序和我们发送是一致的,然而,有上述MQ的投递和消费机制,我们无法保证顺序是正确的,对于顺序异常的消息,消费者 即使有一定的状态容错,也不能完全处理好这么多种随机出现组合情况。

基于上述的情况,RockeMQ采用了这种实现方案:对于相同订单号的消息,通过一定的策略,将其放置在一个 queue队列中,然后消费者再采用一定的策略(一个线程独立处理一个queue,保证处理消息的顺序性),能够保证消费的顺序性

消息顺序性问题

至于消费者是如何保证消费的顺序行的,后续再详细展开,我们先看生产者是如何能将相同订单号的消息发送到同一个queue队列的:

生产者在消息投递的过程中,使用了 MessageQueueSelector 作为队列选择的策略接口,其定义如下:

COPYpublic interface MessageQueueSelector {
        /**
         * 根据消息体和参数,从一批消息队列中挑选出一个合适的消息队列
         * @param mqs  待选择的MQ队列选择列表
         * @param msg  待发送的消息体
         * @param arg  附加参数
         * @return  选择后的队列
         */
        MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

相应地,目前RocketMQ提供了如下几种实现:

在这里插入图片描述

默认实现
投递策略策略实现类说明
随机分配策略SelectMessageQueueByRandom使用了简单的随机数选择算法
基于Hash分配策略SelectMessageQueueByHash根据附加参数的Hash值,按照消息队列列表的大小取余数,得到消息队列的index
基于机器机房位置分配策略SelectMessageQueueByMachineRoom开源的版本没有具体的实现,基本的目的应该是机器的就近原则分配

现在大概看下策略的代码实现:

COPYpublic class SelectMessageQueueByHash implements MessageQueueSelector {

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = arg.hashCode();
        if (value < 0) {
            value = Math.abs(value);
        }

        value = value % mqs.size();
        return mqs.get(value);
    }
}
代码示例

实际的操作代码样例如下,通过订单号作为hash运算对象,就能保证相同订单号的消息能够落在相同的queue队列上

COPYpublic class OrderProducer {
    private static final List<ProductOrder> orderList = new ArrayList<>();

    static {
        orderList.add(new ProductOrder("XXX001", "订单创建"));
        orderList.add(new ProductOrder("XXX001", "订单付款"));
        orderList.add(new ProductOrder("XXX001", "订单完成"));
        orderList.add(new ProductOrder("XXX002", "订单创建"));
        orderList.add(new ProductOrder("XXX002", "订单付款"));
        orderList.add(new ProductOrder("XXX002", "订单完成"));
        orderList.add(new ProductOrder("XXX003", "订单创建"));
        orderList.add(new ProductOrder("XXX003", "订单付款"));
        orderList.add(new ProductOrder("XXX003", "订单完成"));
    }

    public static void main(String[] args) throws Exception {
        //创建一个消息生产者,并设置一个消息生产者组
        DefaultMQProducer producer = new DefaultMQProducer("rocket_test_consumer_group");

        //指定 NameServer 地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        //初始化 Producer,整个应用生命周期内只需要初始化一次
        producer.start();

        for (int i = 0; i < orderList.size(); i++) {
            //获取当前order
            ProductOrder order = orderList.get(i);
            //创建一条消息对象,指定其主题、标签和消息内容
            Message message = new Message(
                    /* 消息主题名 */
                    "topicTest",
                    /* 消息标签 */
                    order.getOrderId(),
                    /* 消息内容 */
                    (order.toString()).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );

            //发送消息并返回结果 使用hash选择策略
            SendResult sendResult = producer.send(message, new SelectMessageQueueByHash(), order.getOrderId());

            System.out.println("product: 发送状态:" + sendResult.getSendStatus() + ",存储queue:" + sendResult.getMessageQueue().getQueueId() + ",orderID:" + order.getOrderId() + ",type:" + order.getType());
        }

        // 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等
        producer.shutdown();
    }
}

打印结果如下

COPYproduct: 发送状态:SEND_OK,存储queue:3,orderID:XXX001,type:订单创建
product: 发送状态:SEND_OK,存储queue:3,orderID:XXX001,type:订单付款
product: 发送状态:SEND_OK,存储queue:3,orderID:XXX001,type:订单完成
product: 发送状态:SEND_OK,存储queue:2,orderID:XXX002,type:订单创建
product: 发送状态:SEND_OK,存储queue:2,orderID:XXX002,type:订单付款
product: 发送状态:SEND_OK,存储queue:2,orderID:XXX002,type:订单完成
product: 发送状态:SEND_OK,存储queue:1,orderID:XXX003,type:订单创建
product: 发送状态:SEND_OK,存储queue:1,orderID:XXX003,type:订单付款
product: 发送状态:SEND_OK,存储queue:1,orderID:XXX003,type:订单完成

消费者分配队列

如何为消费者分配queue队列?

RocketMQ对于消费者消费消息有两种形式:

  • BROADCASTING:广播式消费,这种模式下,一个消息会被通知到每一个消费者
  • CLUSTERING: 集群式消费,这种模式下,一个消息最多只会被投递到一个消费者上进行消费 模式如下:

消费者分配队列

广播式的消息模式比较简单,下面我们介绍下集群式。对于使用了消费模式为MessageModel.CLUSTERING进行消费时,需要保证一个消息在整个集群中只需要被消费一次。实际上,在RoketMQ底层,消息指定分配给消费者的实现,是通过queue队列分配给消费者的方式完成的:也就是说,消息分配的单位是消息所在的queue队列。即:

queue队列指定给特定的消费者后,queue队列内的所有消息将会被指定到消费者进行消费。

RocketMQ定义了策略接口AllocateMessageQueueStrategy,对于给定的消费者分组,和消息队列列表消费者列表当前消费者应当被分配到哪些queue队列,定义如下:

COPY/**
 * 为消费者分配queue的策略算法接口
 */
public interface AllocateMessageQueueStrategy {

    /**
     * Allocating by consumer id
     *
     * @param consumerGroup 当前 consumer群组
     * @param currentCID 当前consumer id
     * @param mqAll 当前topic的所有queue实例引用
     * @param cidAll 当前 consumer群组下所有的consumer id set集合
     * @return 根据策略给当前consumer分配的queue列表
     */
    List<MessageQueue> allocate(
        final String consumerGroup,
        final String currentCID,
        final List<MessageQueue> mqAll,
        final List<String> cidAll
    );

    /**
     * 算法名称
     *
     * @return The strategy name
     */
    String getName();
}

相应地,RocketMQ提供了如下几种实现:

消费者分配策略

算法名称含义
AllocateMessageQueueAveragely平均分配算法
AllocateMessageQueueAveragelyByCircle基于环形平均分配算法
AllocateMachineRoomNearby基于机房临近原则算法
AllocateMessageQueueByMachineRoom基于机房分配算法
AllocateMessageQueueConsistentHash基于一致性hash算法
AllocateMessageQueueByConfig基于配置分配算法

为了讲述清楚上述算法的基本原理,我们先假设一个例子,下面所有的算法将基于这个例子讲解。

假设当前同一个topic下有queue队列 10个,消费者共有4个,如下图所示:

消费者队列分配

下面依次介绍其原理:

平均分配算法

这里所谓的平均分配算法,并不是指的严格意义上的完全平均,如上面的例子中,10个queue,而消费者只有4个,无法是整除关系,除了整除之外的多出来的queue,将依次根据消费者的顺序均摊。

按照上述例子来看,10/4=2,即表示每个消费者平均均摊2个queue;而10%4=2,即除了均摊之外,多出来2个queue还没有分配,那么,根据消费者的顺序consumer-1consumer-2consumer-3consumer-4,则多出来的2个queue将分别给consumer-1consumer-2

最终,分摊关系如下:

  • consumer-1:3个
  • consumer-2:3个
  • consumer-3:2个
  • consumer-4:2个

平均分配算法

其代码实现非常简单:

COPYpublic class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
    private final InternalLogger log = ClientLogger.getLog();

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        if (currentCID == null || currentCID.length() < 1) {
            throw new IllegalArgumentException("currentCID is empty");
        }
        if (mqAll == null || mqAll.isEmpty()) {
            throw new IllegalArgumentException("mqAll is null or mqAll empty");
        }
        if (cidAll == null || cidAll.isEmpty()) {
            throw new IllegalArgumentException("cidAll is null or cidAll empty");
        }

        List<MessageQueue> result = new ArrayList<MessageQueue>();
        if (!cidAll.contains(currentCID)) {
            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
                consumerGroup,
                currentCID,
                cidAll);
            return result;
        }

        int index = cidAll.indexOf(currentCID);
        int mod = mqAll.size() % cidAll.size();
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }

    @Override
    public String getName() {
        return "AVG";
    }
}
演示效果
消费者A
COPYConsumer-线程名称=[32],接收queueId:[0],接收时间:[1608171677558],消息=[Hello Java demo RocketMQ 2]
Consumer-线程名称=[34],接收queueId:[1],接收时间:[1608171677580],消息=[Hello Java demo RocketMQ 3]
Consumer-线程名称=[36],接收queueId:[0],接收时间:[1608171677655],消息=[Hello Java demo RocketMQ 6]
Consumer-线程名称=[38],接收queueId:[1],接收时间:[1608171677679],消息=[Hello Java demo RocketMQ 7]
消费者B
COPYConsumer-线程名称=[35],接收queueId:[2],接收时间:[1608171677508],消息=[Hello Java demo RocketMQ 0]
Consumer-线程名称=[36],接收queueId:[3],接收时间:[1608171677535],消息=[Hello Java demo RocketMQ 1]
Consumer-线程名称=[37],接收queueId:[2],接收时间:[1608171677609],消息=[Hello Java demo RocketMQ 4]
Consumer-线程名称=[38],接收queueId:[3],接收时间:[1608171677635],消息=[Hello Java demo RocketMQ 5]
Consumer-线程名称=[39],接收queueId:[2],接收时间:[1608171677709],消息=[Hello Java demo RocketMQ 8]
Consumer-线程名称=[40],接收queueId:[3],接收时间:[1608171677734],消息=[Hello Java demo RocketMQ 9]

基于环形平均算法

环形平均算法,是指根据消费者的顺序,依次在由queue队列组成的环形图中逐个分配。具体流程如下所示:

基于环形平均算法

这种算法最终分配的结果是:

  • consumer-1: #0,#4,#8
  • consumer-2: #1, #5, # 9
  • consumer-3: #2,#6
  • consumer-4: #3,#7

其代码实现如下所示:

COPY/**
 * Cycle average Hashing queue algorithm
 */
public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
    private final InternalLogger log = ClientLogger.getLog();

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        if (currentCID == null || currentCID.length() < 1) {
            throw new IllegalArgumentException("currentCID is empty");
        }
        if (mqAll == null || mqAll.isEmpty()) {
            throw new IllegalArgumentException("mqAll is null or mqAll empty");
        }
        if (cidAll == null || cidAll.isEmpty()) {
            throw new IllegalArgumentException("cidAll is null or cidAll empty");
        }

        List<MessageQueue> result = new ArrayList<MessageQueue>();
        if (!cidAll.contains(currentCID)) {
            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
                consumerGroup,
                currentCID,
                cidAll);
            return result;
        }

        int index = cidAll.indexOf(currentCID);
        for (int i = index; i < mqAll.size(); i++) {
            if (i % cidAll.size() == index) {
                result.add(mqAll.get(i));
            }
        }
        return result;
    }

    @Override
    public String getName() {
        return "AVG_BY_CIRCLE";
    }
}
演示效果
设置算法
COPY//设置使用环形hash算法
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(null, "rocket_test_consumer_group", null, new AllocateMessageQueueAveragelyByCircle());
消费者A
COPYConsumer-线程名称=[35],接收queueId:[0],接收时间:[1608171903364],消息=[Hello Java demo RocketMQ 1]
Consumer-线程名称=[38],接收queueId:[2],接收时间:[1608171903411],消息=[Hello Java demo RocketMQ 3]
Consumer-线程名称=[39],接收queueId:[0],接收时间:[1608171903459],消息=[Hello Java demo RocketMQ 5]
Consumer-线程名称=[40],接收queueId:[2],接收时间:[1608171903508],消息=[Hello Java demo RocketMQ 7]
Consumer-线程名称=[41],接收queueId:[0],接收时间:[1608171903562],消息=[Hello Java demo RocketMQ 9]
消费者B
COPYConsumer-线程名称=[28],接收queueId:[3],接收时间:[1608171903346],消息=[Hello Java demo RocketMQ 0]
Consumer-线程名称=[30],接收queueId:[1],接收时间:[1608171903393],消息=[Hello Java demo RocketMQ 2]
Consumer-线程名称=[32],接收queueId:[3],接收时间:[1608171903443],消息=[Hello Java demo RocketMQ 4]
Consumer-线程名称=[34],接收queueId:[1],接收时间:[1608171903490],消息=[Hello Java demo RocketMQ 6]
Consumer-线程名称=[36],接收queueId:[3],接收时间:[1608171903540],消息=[Hello Java demo RocketMQ 8]

一致性hash分配算法

使用这种算法,会将consumer消费者作为Node节点构造成一个hash环,然后queue队列通过这个hash环来决定被分配给哪个consumer消费者

其基本模式如下:

一致性hash分配算法

一致性hash算法用于在分布式系统中,保证数据的一致性而提出的一种基于hash环实现的算法

算法实现上也不复杂,如下图所示:

COPYpublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
                                   List<String> cidAll) {
    //省略部分代码
    List<MessageQueue> result = new ArrayList<MessageQueue>();
    if (!cidAll.contains(currentCID)) {
        log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
                 consumerGroup,
                 currentCID,
                 cidAll);
        return result;
    }

    Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
    for (String cid : cidAll) {
        cidNodes.add(new ClientNode(cid));
    }
    //使用consumer id 构造hash环
    final ConsistentHashRouter<ClientNode> router; //for building hash ring
    if (customHashFunction != null) {
        router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
    } else {
        router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
    }
    //依次为 队列分配 consumer
    List<MessageQueue> results = new ArrayList<MessageQueue>();
    for (MessageQueue mq : mqAll) {
        ClientNode clientNode = router.routeNode(mq.toString());
        if (clientNode != null && currentCID.equals(clientNode.getKey())) {
            results.add(mq);
        }
    }

    return results;

}
演示效果
设置算法
COPY//设置使用环形hash算法
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(null, "rocket_test_consumer_group", null, new AllocateMessageQueueConsistentHash());
消费者A
COPYConsumer-线程名称=[29],接收queueId:[0],接收时间:[1608172067310],消息=[Hello Java demo RocketMQ 0]
Consumer-线程名称=[31],接收queueId:[1],接收时间:[1608172067323],消息=[Hello Java demo RocketMQ 1]
Consumer-线程名称=[33],接收queueId:[2],接收时间:[1608172067345],消息=[Hello Java demo RocketMQ 2]
Consumer-线程名称=[37],接收queueId:[0],接收时间:[1608172067395],消息=[Hello Java demo RocketMQ 4]
Consumer-线程名称=[39],接收queueId:[1],接收时间:[1608172067418],消息=[Hello Java demo RocketMQ 5]
Consumer-线程名称=[40],接收queueId:[2],接收时间:[1608172067443],消息=[Hello Java demo RocketMQ 6]
Consumer-线程名称=[41],接收queueId:[0],接收时间:[1608172067494],消息=[Hello Java demo RocketMQ 8]
Consumer-线程名称=[42],接收queueId:[1],接收时间:[1608172067518],消息=[Hello Java demo RocketMQ 9]
消费者B
COPYConsumer-线程名称=[28],接收queueId:[3],接收时间:[1608172067383],消息=[Hello Java demo RocketMQ 3]
Consumer-线程名称=[30],接收queueId:[3],接收时间:[1608172067475],消息=[Hello Java demo RocketMQ 7]

机房临近分配算法

该算法使用了装饰者设计模式,对分配策略进行了增强。一般在生产环境,如果是微服务架构下,RocketMQ集群的部署可能是在不同的机房中部署,其基本结构可能如下图所示:

机房临近分配

对于跨机房的场景,会存在网络、稳定性和隔离心的原因,该算法会根据queue的部署机房位置和消费者consumer的位置,过滤出当前消费者consumer相同机房的queue队列,然后再结合上述的算法,如基于平均分配算法在queue队列子集的基础上再挑选。相关代码实现如下:

COPY@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
                                   List<String> cidAll) {
    //省略部分代码
    List<MessageQueue> result = new ArrayList<MessageQueue>();

    //将MQ按照 机房进行分组
    Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();
    for (MessageQueue mq : mqAll) {
        String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);
        if (StringUtils.isNoneEmpty(brokerMachineRoom)) {
            if (mr2Mq.get(brokerMachineRoom) == null) {
                mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());
            }
            mr2Mq.get(brokerMachineRoom).add(mq);
        } else {
            throw new IllegalArgumentException("Machine room is null for mq " + mq);
        }
    }

    //将消费者 按照机房进行分组
    Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();
    for (String cid : cidAll) {
        String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);
        if (StringUtils.isNoneEmpty(consumerMachineRoom)) {
            if (mr2c.get(consumerMachineRoom) == null) {
                mr2c.put(consumerMachineRoom, new ArrayList<String>());
            }
            mr2c.get(consumerMachineRoom).add(cid);
        } else {
            throw new IllegalArgumentException("Machine room is null for consumer id " + cid);
        }
    }

    List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();

    //1.过滤出当前机房内的MQ队列子集,在此基础上使用分配算法挑选
    String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);
    List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);
    List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);
    if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {
        allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
    }

    //2.不在同一机房,按照一般策略进行操作
    for (String machineRoom : mr2Mq.keySet()) {
        if (!mr2c.containsKey(machineRoom)) { // no alive consumer in the corresponding machine room, so all consumers share these queues
            allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll));
        }
    }

    return allocateResults;
}
基于机房分配算法

该算法适用于属于同一个机房内部的消息,去分配queue。这种方式非常明确,基于上面的机房临近分配算法的场景,这种更彻底,直接指定基于机房消费的策略。这种方式具有强约定性,比如broker名称按照机房的名称进行拼接,在算法中通过约定解析进行分配。

其代码实现如下:

COPY/**
 * Computer room Hashing queue algorithm, such as Alipay logic room
 */
public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
    private Set<String> consumeridcs;

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        List<MessageQueue> result = new ArrayList<MessageQueue>();
        int currentIndex = cidAll.indexOf(currentCID);
        if (currentIndex < 0) {
            return result;
        }
        List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
        for (MessageQueue mq : mqAll) {
            String[] temp = mq.getBrokerName().split("@");
            if (temp.length == 2 && consumeridcs.contains(temp[0])) {
                premqAll.add(mq);
            }
        }

        int mod = premqAll.size() / cidAll.size();
        int rem = premqAll.size() % cidAll.size();
        int startIndex = mod * currentIndex;
        int endIndex = startIndex + mod;
        for (int i = startIndex; i < endIndex; i++) {
            result.add(mqAll.get(i));
        }
        if (rem > currentIndex) {
            result.add(premqAll.get(currentIndex + mod * cidAll.size()));
        }
        return result;
    }

    @Override
    public String getName() {
        return "MACHINE_ROOM";
    }

    public Set<String> getConsumeridcs() {
        return consumeridcs;
    }

    public void setConsumeridcs(Set<String> consumeridcs) {
        this.consumeridcs = consumeridcs;
    }
}

基于配置分配算法

这种算法单纯基于配置的,非常简单,实际使用中可能用途不大。代码如下:

COPYpublic class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
    private List<MessageQueue> messageQueueList;

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        return this.messageQueueList;
    }

    @Override
    public String getName() {
        return "CONFIG";
    }

    public List<MessageQueue> getMessageQueueList() {
        return messageQueueList;
    }

    public void setMessageQueueList(List<MessageQueue> messageQueueList) {
        this.messageQueueList = messageQueueList;
    }
}

消费者如何指定分配算法

消费者构造方法

在DefaultMQPushConsumer构造方法中可以传入分配策略

默认情况下,消费者使用的是AllocateMessageQueueAveragely算法,也可以自己指定:

COPYpublic class DefaultMQPushConsumer{    
    /**
     * Default constructor.
     */
    public DefaultMQPushConsumer() {
        this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());
    }


    /**
     * Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
     *
     * @param consumerGroup Consume queue.
     * @param rpcHook RPC hook to execute before each remoting command.
     * @param allocateMessageQueueStrategy message queue allocating algorithm.
     */
    public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
        this.consumerGroup = consumerGroup;
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
    }
}

我们看到默认使用了AllocateMessageQueueAveragely平均分配策略

使用其他分配策略

如果需要使用其他分配策略,使用方式如下

本文由传智教育博学谷狂野架构师教研团队发布。

如果本文对您有帮助,欢迎关注点赞;如果您有任何建议也可留言评论私信,您的支持是我坚持创作的动力。

转载请注明出处!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/430600.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

java中级面试题

1.假如有两个线程共同操作数据库&#xff0c;以乐观锁的角度考虑&#xff0c;怎么确保不会发生并发问题&#xff1f; PS&#xff1a;考点是CAS&#xff0c;比较并替换。CAS中有三个值&#xff0c;内存中的值&#xff0c;新值&#xff0c;旧值。 假如内存中的值是2000&#xf…

[C++]string类的模拟实现和相关函数的详解

目录string总体架构具体实现默认成员函数构造函数构造拷贝函数析构函数赋值重载[]相关操作函数c_str() && size()reserve() && resize()push_back() && append()find()inserterase() && clear其余操作符重载< 、 <、 >、 >、 !<…

【系统集成项目管理工程师】项目整体管理

&#x1f4a5;十大知识领域&#xff1a;项目整体管理 项目整体管理包括以下 6 个过程: 制定项目章程定项目管理计划指导与管理项目工作监控项目工作实施整体变更控制结束项目或阶段过程 一、制定项目章程 制定项目章程。编写一份正式文件的过程&#xff0c;这份文件就是项目章程…

某程序员哀叹:月薪四五万,却每天极度焦虑痛苦,已有生理性不适,又不敢裸辞,怎么办?

高薪能买来快乐吗&#xff1f; 来看看这位程序员的哀叹&#xff1a; 实在是扛不住了&#xff0c;每天都在极度焦虑和痛苦中度过&#xff0c;早上起来要挣扎着做心理建设去上班&#xff0c;已经产生生理性的头晕恶心食欲不振。有工作本身的原因&#xff0c;更多是自己心态的问…

OpenCV+FFmpeg 实现人脸检测Rtmp直播推流(Python快速实现)

实现效果 windows平台笔记本摄像头视频采集、人脸识别&#xff0c;识别后将视频推流到RTMP流媒体服务器&#xff0c;在任意客户端可以进行RTMP拉流播放。 效果如图&#xff1a; 使用VLC播放器进行拉流。 准备工作 需要先安装OpenCV的python包以及FFmpeg。 对于ffmpeg有两…

Java——删除链表中重复的节点

题目链接 牛客在线oj题——删除链表中重复的节点 题目描述 在一个排序的链表中&#xff0c;存在重复的结点&#xff0c;请删除该链表中重复的结点&#xff0c;重复的结点不保留&#xff0c;返回链表头指针。 例如&#xff0c;链表 1->2->3->3->4->4->5 处…

【Vue】学习笔记-数据代理

数据代理 Object.defineproperty方法 <script type"text/javascript">let number18let person{name:张三,sex:男,}//age属性 不参与遍历Object.defineProperty(person,age,{//value:18,//enumerable:true, //控制属性是否可以枚举&#xff0c;默认值是false//…

科技成果评价最新攻略,你确定不来看看?

一、什么是科技成果评价&#xff1f; 是指按照委托者的要求&#xff0c;由具有评价资质的第三方专业机构聘请专家&#xff0c;坚持实事求是、科学民主、客观公正、注重质量、讲求实效的原则&#xff0c;依照规定的程序和标准&#xff0c;对被评价科技成果进行审查与辨别&#…

[Java Web]VUE | vue:一项Java Web开发中不可或缺的前端技术

⭐作者介绍&#xff1a;大二本科网络工程专业在读&#xff0c;持续学习Java&#xff0c;努力输出优质文章 ⭐作者主页&#xff1a;逐梦苍穹 ⭐所属专栏&#xff1a;Java Web ⭐如果觉得文章写的不错&#xff0c;欢迎点个关注一键三连&#x1f609;有写的不好的地方也欢迎指正&a…

AD19 基础应用技巧(快速定义PCB板框,CAD中DWG转DXF格式导入)

【B站一个假的攻城狮】导入CAD图纸到PCB&#xff0c;Altium Designer 21教程&#xff0c;第九节。 http://www.keyboard-layout-editor.com/ http://builder.swillkb.com/ 1、打开中望CAD&#xff0c;并打开一张图纸文件&#xff0c;为了能把孔表达清楚&#xff0c;开孔断面图…

React(六) —— redux

&#x1f9c1;个人主页&#xff1a;个人主页 ✌支持我 &#xff1a;点赞&#x1f44d;收藏&#x1f33c;关注&#x1f9e1; 文章目录⛳Redux&#x1f346;redux定义&#x1f490;redux使用原则&#x1f370;redux使用场景&#x1f9ca;redux工作流程&#x1f96b;redux基本创建…

14.创建组件

组件可以理解为页面的拼图块&#xff0c;一个完整的页面是由若干个组件拼成的 在vue中规定&#xff0c;组件的后缀名为vue&#xff0c;每一个vue文件中应该包含三个大标签 template 组件的模板结构&#xff0c;可以理解为htmlscript 组件的JS&#xff0c;控制组件要执行什么动…

区域检验管理系统(云LIS)源码

1、区域检验管理系统&#xff08;云LIS&#xff09;概述 云LIS是为区域医疗提供临床实验室信息服务的计算机应用程序&#xff0c;可协助区域内所有临床实验室相互协调并完成日常检验工作&#xff0c;对区域内的检验数据进行集中管理和共享&#xff0c;通过对质量控制的管理&am…

Java每日一练(20230418)

目录 1. N皇后 II &#x1f31f;&#x1f31f;&#x1f31f; 2. 字符串相乘 &#x1f31f;&#x1f31f; 3. 买卖股票的最佳时机 &#x1f31f; &#x1f31f; 每日一练刷题专栏 &#x1f31f; Golang每日一练 专栏 Python每日一练 专栏 C/C每日一练 专栏 Java每日一…

“Natural Earth II“ === “Natural Earth II“是false?你知道空 格的四种写法吗?

前言 有一回对我说道&#xff0c;“你学过前端么&#xff1f;”我略略点一点头。他说&#xff0c;“学过前端&#xff1f;……我便考你一考。html 里面的空格&#xff0c;怎样 coding 的&#xff1f;”我想&#xff0c;讨饭一样的人&#xff0c;也配考我么&#xff1f;便回过脸…

计算机网络 - 网络中的基本概念

前言 本篇介绍网络的一些基本概念&#xff0c;认识IP地址&#xff0c;端口号&#xff0c;协议&#xff1b;了解常用的网络协议模型&#xff0c;知道数据如何封装与分用的&#xff1b;为以后学习计算机网络其它知识做铺垫&#xff0c;如有错误&#xff0c;请在评论区指正&#…

Java数据结构 二叉树基本知识 二叉树遍历

二叉树很简单的&#xff0c;试试呗~ 文章目录 Java数据结构 & 二叉树基本知识 & 二叉树遍历1. 树的基本定义2. 树的基本概念2.1 例子2.2 树的代码表示&#xff1a; 3. 二叉树3.1 特殊节点3.2 特殊的二叉树3.3 二叉树的性质3.3.1 证明第三点3.3.2 证明第四点 4. 二叉树遍…

MySQL-MHA高可用(一)

目录 &#x1f341;同步概念 &#x1f341;工作原理 &#x1f343;环境拓扑 &#x1f341;环境准备 &#x1f342;manager &#x1f342;master1 &#x1f342;master2 &#x1f342;slave &#x1f343;配置半同步复制 &#x1f341;master1 &#x1f341;master2 &#x1f34…

函数 tcgetpgrp tcsetpgrp 和 tcgetsid

① tcgetpgrp & tcsetpgrp 函数 tcgetpgrp函数是用来获取前台进程组的ID #include <stdio.h> #include <sys/types.h> #include <sys/stat.h> #include <unistd.h> #include <fcntl.h> int main() {printf("我的ID&#xff1a;%d---我…

【MySQL学习】MySQL库的操作

目录一、查看数据库的连接二、数据库的创建三、字符集和校验规则3.1 查看数据库默认的字符集以及校验规则3.2 查看数据库支持的字符集以及校验规则3.3 校验规则对数据库的影响四、操纵数据库4.1 查看数据库4.2 显示创建语句4.3 修改数据库4.4 数据库的删除五、数据库的备份与恢…