SpringBoot学习小结之RocketMQ

news2025/1/11 0:38:09

文章目录

  • 前言
  • 一、架构设计
    • 1.1 架构图
    • 1.2 消息
    • 1.3 工作流程
  • 二、部署
    • 2.1 单机
    • 2.2 集群
  • 三、Springboot Producter
    • 3.1 准备
    • 3.2 pom依赖、yml 配置
    • 3.3 普通消息
    • 3.4 顺序、批量、延迟消息
    • 3.5 事务消息
  • 四、Springboot Consumer
    • 4.1 配置
    • 4.2 普通Push消费
    • 4.3 回复
    • 4.4 集群和广播
    • 4.5 并发和顺序
    • 4.6 消息过滤
    • 4.7 重试和死信
    • 4.8 设置消费组负载均衡策略
    • 4.9 设置offset
    • 4.10 Pull 消息
  • 五、总结
    • 5.1 优点
    • 5.2 缺点
  • 参考

前言

在当今互联网时代,随着数据规模和业务复杂度的不断增长,分布式消息中间件作为实现系统解耦、异步通信和削峰填谷的重要工具,扮演着越来越关键的角色。而在众多的消息中间件中,Apache RocketMQ 以其出色的性能、高可用性和可扩展性,成为了许多企业构建分布式系统的首选之一。

RocketMQ 是一种开源分布式消息队列系统, 由阿里巴巴集团开发并在2012年开源,现已成为 Apache 软件基金会的顶级项目之一。它具备高吞吐量、低延迟、高可靠性和强大的水平扩展能力等特性,被广泛应用于互联网、金融、电商、物联网等各个领域。

本文将带您深入了解 RocketMQ,探索其架构设计、部署以及在 Springboot 中使用,帮助您更好地理解和应用这一强大的消息中间件,提升系统的性能和可靠性,实现业务的快速发展和创新。

一、架构设计

1.1 架构图

RocketMQ 的组成部分

组件功能特点
Name Server负责维护集群中所有 Broker 的路由信息和消息队列的状态信息各个 NameServer 相互独立,没有信息转发
Broker存储消息,接收来自生产者的消息并将其提供给消费者也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息
Producer消息的生产者,负责将消息发送到 Broker通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递过程支持快速失败和重试
Consumer消息的消费者,从 Broker 中获取消息进行处理支持推(push)和拉(pull)两种模式对消息进行消费,支持集群方式和广播方式的消费,提供实时消息订阅机制,满足大多数用户的需求

消费模式可以大致分为两种,一种是推Push,一种是拉Pull。

  • Push 是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
  • Pull 是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。

1.2 消息

一些和消息相关的核心概念

名词含义
Message消息系统所传输信息的物理载体,生产和消费数据的最小单位。一条消息必须有一个主题(Topic)。消息 body 默认最大 4M。
Topic一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题。
Tag为消息设置的标志,用于同一 Topic 下区分不同类型的消息,目前只支持每个消息设置一个, 一般在Topic后加 :Tag名。
ProducerGroup一组 Producer 的集合,这些 Producer 共同实现了某个业务逻辑,通常是发送相同类型或相关类型的消息到同一个 Topic。
ConsumerGroup一组 Consumer 的集合,ConsumerGroup下的消费者主要有两种负载均衡模式,即广播模式和集群模式。在集群模式下,同一个 ConsumerGroup 中的 Consumer 实例是负载均衡消费,在广播模式下,同一个 ConsumerGroup 中每个 Consumer 实例都需要处理全部的消息。
Message Queue一个 Topic 下的物理存储单元,用于存储发送到该 Topic 的消息。每个 Topic 可以有多个消息队列,消息会根据一定的规则被分配到这些消息队列中。
Offset消费者在消息队列中消费消息的位置信息。

消息发送返回的状态

状态含义
SEND_OK消息发送成功,要注意的是消息发送成功也不意味着它是可靠的,要确保不会丢失任何消息,还应启用同步 Master 服务器或同步刷盘,即 SYNC_MASTER 或 SYNC_FLUSH。
FLUSH_DISK_TIMEOUT消息发送成功但是服务器刷盘超时,此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失,消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度,如果 Broker 服务器设置了刷盘方式为同步刷盘,即 FlushDiskType=SYNC_FLUSH(默认为异步刷盘方式),当 Broker 服务器未在同步刷盘时间内(默认为5s)完成刷盘,则将返回该状态——刷盘超时。
FLUSH_SLAVE_TIMEOUT消息发送成功,但是服务器同步到 Slave 时超时,此时消息已经进入服务器队列,只有服务器宕机,消息才会丢失,如果 Broker 服务器的角色是同步 Master,即 SYNC_MASTER(默认是异步 Master 即 ASYNC_MASTER),并且从 Broker 服务器未在同步刷盘时间(默认为5秒)内完成与主服务器的同步,则将返回该状态——数据同步到 Slave 服务器超时。
SLAVE_NOT_AVAILABLE消息发送成功,但是此时 Slave 不可用,如果 Broker 服务器的角色是同步 Master,即 SYNC_MASTER(默认是异步 Master 服务器即 ASYNC_MASTER),但没有配置 slave Broker 服务器,则将返回该状态——无 Slave 服务器可用。

Group 和 Cluster 区别

  • Group 和 Cluster 是两个不同的概念,前者是逻辑上,后者是物理上。
  • 以Producer举例,一个 Producer Cluster 可以包含多个 Producer Group,而一个 Producer Group 只属于一个 Producer Cluster。换句话说,一个生产者集群可以包含多个逻辑上不同的生产者组,每个生产者组都有其特定的 ProducerGroup 标识。

1.3 工作流程

  1. 启动 NameServer

    NameServer 启动后监听端口,等待 Broker、Producer、Consumer 连接,相当于一个路由控制中心。

  2. 启动 Broker

    与所有 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic跟 Broker 的映射关系。

  3. 创建 Topic

    创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic , 但不建议。

  4. 生产者发送消息

    启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在于哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker建立长连接从而向 Broker 发消息。

  5. 消费者接受消息

    跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker上,然后直接跟 Broker 建立连接通道,然后开始消费消息。

二、部署

2.1 单机

mqnamesrv
mqbroker -n localhost:9876

mqnamesrv 默认端口9876,可通过添加配置文件 namesrv.conf 修改

listenPort = 9878
mqnamesrv -c ../conf/namesrv.conf
mqbroker -n localhost:9878

broker 默认端口10911,修改 broker 端口号, 在配置文件 broker.cnf 添加

listenPort = 11087

并在启动时指定配置文件

mqbroker -n localhost:9877 -c ../conf/broker.conf

2.2 集群

6台机器,配置两个 NameServer、4个 Broker(2主2从)

mqnamesrv -n 192.168.1.1:9876
mqnamesrv -n 192.168.1.2:9876

mqbroker -n 192.168.1.1:9876;192.161.2:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties
mqbroker -n 192.168.1.1:9876;192.161.2:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties
mqbroker -n 192.168.1.1:9876;192.161.2:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.propertie
mqbroker -n 192.168.1.1:9876;192.161.2:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties

同步双写配置,每个 Master 配置一个 Slave,有多对 Master-Slave ,HA 采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

  • 优点:数据与服务都无单点故障,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高。
  • 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

三、Springboot Producter

3.1 准备

mqnamesrv -c ../conf/namesrv.conf
mqbroker -n localhost:9878 -c ../conf/broker.conf

mqadmin updateTopic -b 127.0.0.1:11087 -t demo-topic

3.2 pom依赖、yml 配置

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>
rocketmq:
  name-server: localhost:9878
  producer:
    group: test-producter
demo:
  rocketmq:
    topic: demo-topic
spring:
  application:
    name: producter

3.3 普通消息

Apache RocketMQ 可用于以三种方式发送消息:同步、异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。

使用场景:

  • 同步:一些对消息可靠性要求较高的场景,如订单支付、账单通知等。
  • 异步:一些链路耗时较长,对响应时间较为敏感的业务的场景,如视频上传后通知启动转码服务,转码完成后通知推送转码结果 等。
  • 单向:一些不需要关心消息发送结果,只需简单地发送消息而不关心是否成功的场景,如日志记录等。
@Autowired
private RocketMQTemplate rocketMQTemplate;

@Value("${demo.rocketmq.topic}")
private String topic;

@Test
void test1_sync() {
    SendResult sendResult = rocketMQTemplate.syncSend(topic, "同步发送信息");
    log.info("同步发送结果:{}", sendResult);
    rocketMQTemplate.convertAndSend(topic, "simple-send-topic simple hello");
}

@Test
void test2_async() {
    rocketMQTemplate.asyncSend(topic, "异步发送信息", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("异步发送结果:{}", sendResult);
            }

            @Override
            public void onException(Throwable e) {
                log.info("异步发送异常: {}", e.toString());
            }
        });
}

@Test
void test3_oneway() {
    rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload("one way message").build());
    rocketMQTemplate.convertAndSend(topic, "simple-send-topic simple hello");
    log.info("发送 oneway message");
}

@Test
void test4_tags() {
    Message message = new Message(topic, "hello tags message".getBytes());
    message.setTags("tagA");
    try {
        rocketMQTemplate.getProducer().send(message);
    } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
        log.error("syncSend failed. message:{} ", message);
        throw new RuntimeException(e);
    }
    log.info("发送 tag message");
}

3.4 顺序、批量、延迟消息

  • 顺序消息是一种在消息发送和消费过程中要求严格按照特定顺序进行处理的消息。在 RocketMQ 中,顺序消息遵循先进先出(FIFO)的原则,确保按照消息发布的顺序进行消费

    在 RocketMQ 中,支持分区顺序消息,通过对消息进行分区,确保同一个分区键的消息会被分配到同一个队列中,并按照顺序进行消费。

    RocketMQ 的消息顺序性包括生产顺序性和消费顺序性。为了实现消息的顺序性,需要同时满足生产者和消费者两方面的条件:

    • 生产顺序性:确保单个生产者串行地发送消息,并按序存储和持久化。要满足生产顺序性,需要保证消息的发送是单一生产者、串行发送的。
    • 消费顺序性:消费者按照消息的顺序进行处理。RocketMQ 通过设置相同的分区键,将消息发送至同一队列中,从而保证消费顺序性。

    顺序消息适用于需要严格保持事件顺序的场景,如有序事件处理、撮合交易、数据实时增量同步等。例如,在订单处理场景中,需要确保订单生成、付款和发货等操作按照顺序执行,顺序消息能够满足这种需求。

  • 批量消息是在对吞吐率有一定要求的情况下,RocketMQ 可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少 API 和网络调用次数。

  • 延迟消息发送是指消息发送到 RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到 Consumer 进行消费。

    在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的延时事件触发。使用 RocketMQ 的延时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。

    投递等级(delay level)延迟时间投递等级(delay level)延迟时间
    11s106min
    25s117min
    310s128min
    430s139min
    51min1410min
    62min1520min
    73min1630min
    84min171h
    95min182h
@Test
void test5_order() {
    for (int q = 0; q < 4; q++) {
        // send to 4 queues
        List<org.springframework.messaging.Message> msgs = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            int msgIndex = q * 10 + i;
            String msg = String.format("Hello RocketMQ Batch Msg#%d to queue: %d", msgIndex, q);
            msgs.add(MessageBuilder.withPayload(msg).
                     setHeader(RocketMQHeaders.KEYS, "KEY_" + msgIndex).build());
        }
        SendResult sr = rocketMQTemplate.syncSendOrderly(topic, msgs, q + "", 60000);
        System.out.println("--- Batch messages orderly to queue :" + sr.getMessageQueue().getQueueId() + " send result :" + sr);
    }
}

@Test
void test6_batch() {
    List<org.springframework.messaging.Message> msgs = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).
                 setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
    }

    SendResult sr = rocketMQTemplate.syncSend(topic, msgs, 60000);
    log.info("--- Batch messages send result : {}",  sr);
}

@Test
void test7_lazy() {

    int totalMessagesToSend = 100;
    for (int i = 0; i < totalMessagesToSend; i++) {
        Message message = new Message(topic, ("Hello scheduled message " + i).getBytes());
        // This message will be delivered to consumer 10 seconds later.
        message.setDelayTimeLevel(3);

        try {
            rocketMQTemplate.getProducer().send(message);
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            log.error("error {}", e.toString());
            throw new RuntimeException(e);
        }
    }

    log.info("final send result");
}
}

3.5 事务消息

  1. 生产者将半事务消息发送至 RocketMQ Broker

  2. RocketMQ Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息暂不能投递,为半事务消息。

  3. 生产者开始执行本地事务逻辑。

  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。

  6. 需要注意的是,服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚,因此未决事务的回查时效性非常关键,需要按照业务的实际风险来设置

  7. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

  8. 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

@Test
void test8_transaction() {
    for (int i = 0; i < 10; i++) {
        try {
            org.springframework.messaging.Message msg = MessageBuilder.withPayload("rocketmq transactional message " + i).
                setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i).build();
            SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(null,
                                                                              topic, msg, null);
            log.info("------RocketMQTemplate send Transactional msg body = {}  , sendResult= {}",
                     msg.getPayload(), sendResult.getSendStatus());

            Thread.sleep(10);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(org.springframework.messaging.Message msg, Object arg) {
        String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
        log.info("#### executeLocalTransaction is executed, msgTransactionId={}", transId);
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(transId, status);
        if (status == 0) {
            // Return local transaction with success(commit), in this case,
            // this message will not be checked in checkLocalTransaction()
            log.info("    # COMMIT # Simulating msg {} related local transaction exec succeeded! ###", msg.getPayload());
            return RocketMQLocalTransactionState.COMMIT;
        }

        if (status == 1) {
            // Return local transaction with failure(rollback) , in this case,
            // this message will not be checked in checkLocalTransaction()
            log.info("    # ROLLBACK # Simulating {} related local transaction exec failed! ", msg.getPayload());
            return RocketMQLocalTransactionState.ROLLBACK;
        }

        log.info("    # UNKNOW # Simulating %s related local transaction exec UNKNOWN!");
        return RocketMQLocalTransactionState.UNKNOWN;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(org.springframework.messaging.Message msg) {
        String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
        RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;
        Integer status = localTrans.get(transId);
        if (null != status) {
            switch (status) {
                case 0:
                    retState = RocketMQLocalTransactionState.COMMIT;
                    break;
                case 1:
                    retState = RocketMQLocalTransactionState.ROLLBACK;
                    break;
                case 2:
                    retState = RocketMQLocalTransactionState.UNKNOWN;
                    break;
            }
        }
        log.info("------ !!! checkLocalTransaction is executed once," +
                 " msgTransactionId={}, TransactionState=%s status={} {}",
                 transId, retState, status);
        return retState;
    }


}

四、Springboot Consumer

4.1 配置

rocketmq:
  name-server: localhost:9878
  consumer:
    topic: demo-topic
    group: consumer-group 

demo:
  rocketmq:
    group: broadcast-group
    consumer:
      tag: tagA
spring:
  application:
    name: consumer

4.2 普通Push消费

@Slf4j
@Component
@RocketMQMessageListener(
        topic = "${rocketmq.consumer.topic}",
        consumerGroup = "${rocketmq.consumer.group}"
)
public class MessageListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("接收消息:{}", message);
    }
}

4.3 回复

@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}", consumerGroup = "${demo.rocketmq.bytesRequestConsumer}")
public class ConsumerWithReplyBytes implements RocketMQReplyListener<MessageExt, byte[]> {

    @Override
    public byte[] onMessage(MessageExt message) {
        System.out.printf("------- ConsumerWithReplyBytes received: %s \n", message);
        return "reply message content".getBytes();
    }
}

4.4 集群和广播

默认集群

@Slf4j
@Component
@RocketMQMessageListener(
        topic = "${demo.rocketmq.topic}",
        consumerGroup = "${demo.rocketmq.consumer.group}",
        messageModel = MessageModel.BROADCASTING
)
public class MessageListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("广播接收消息:{}", message);
    }
}

4.5 并发和顺序

默认并发

@Slf4j
@Component
@RocketMQMessageListener(
        topic = "${demo.rocketmq.topic}",
        consumerGroup = "${demo.rocketmq.consumer.group}",
        consumeMode = ConsumeMode.ORDERLY
)
public class OrderMessageListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("顺序接收消息:{}", message);
    }
}

4.6 消息过滤

消息过滤分为 Tag 过滤和 SQL 过滤,默认Tag过滤

在 SQL 语法中,Tag 的属性值为 TAGS,开启属性过滤首先要在 Broker 端设置配置enablePropertyFilter=true

@Slf4j
@Component
@RocketMQMessageListener(
        topic = "${rocketmq.consumer.topic}",
        consumerGroup = "${rocketmq.consumer.group}",
        selectorExpression = "${demo.rocketmq.consumer.tag}"
)
public class FilterMessageListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("过滤Tag接收消息:{}", message);
    }
}
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "${rocketmq.consumer.topic}",
        consumerGroup = "${rocketmq.consumer.group}",
        selectorExpression = "TAGS is not null and TAGS in ('tagA', 'tagB')",
        selectorType = SelectorType.SQL92
)
public class SQLFilterMessageListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("SQL过滤Tag接收消息:{}", message);
    }
}

4.7 重试和死信

默认重试次数为 -1,即 异步为16,顺序为Interger.MAXVALUE

异步重试时间间隔

第几次重试与上次重试的间隔时间第几次重试与上次重试的间隔时间
110s97min
230s108min
31min119min
42min1210min
53min1320min
64min1430min
75min151h
86min162h
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "${rocketmq.consumer.topic}",
        consumerGroup = "${rocketmq.consumer.group}",
        maxReconsumeTimes = 3
)
public class RetryMessageListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("接收消息:{}", message);
    }
}

当一条消息在初次消费时失败,RocketMQ会自动进行消息重试。若达到最大重试次数后仍然失败,则表明该消息在正常情况下无法被正确消费。

此时,该消息并非立即丢弃,而是会被发送到特殊队列,称为死信队列(Dead-Letter Queue),而这类消息则被称为死信消息(Dead-Letter Message)。

死信队列是死信Topic下唯一的单独队列,而死信Topic的名称通常为%DLQ%ConsumerGroupName,其中 ConsumerGroupName 为对应消费者组的名称。

通过RocketMQ Admin工具或 RocketMQ Dashboard,可以查询到这些死信消息的信息,但它们不会再被消费。

4.8 设置消费组负载均衡策略

例如一个 Topic 有8个队列,一个消费组中有3个消费者,那这三个消费者各自去消费哪些队列。RocketMQ 默认提供了如下负载均衡算法:

  • AllocateMessageQueueAveragely:平均连续分配算法。
  • AllocateMessageQueueAveragelyByCircle:平均轮流分配算法。
  • AllocateMachineRoomNearby:机房内优先就近分配。
  • AllocateMessageQueueByConfig:手动指定,这个通常需要配合配置中心,在消费者启动时,首先先创建 AllocateMessageQueueByConfig 对象,然后根据配置中心的配置,再根据当前的队列信息,进行分配,即该方法不具备队列的自动负载,在 Broker 端进行队列扩容时,无法自动感知,需要手动变更配置。
  • AllocateMessageQueueByMachineRoom:消费指定机房中的队列,该分配算法首先需要调用该策略的 setConsumeridcs(Set<String> consumerIdCs) 方法,用于设置需要消费的机房,将刷选出来的消息按平均连续分配算法进行队列负载。
  • AllocateMessageQueueConsistentHash 一致性 Hash 算法。
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "${rocketmq.consumer.topic}",
        consumerGroup = "${demo.rocketmq.rebalancegroup}"
)
public class RebalanceMessageListener implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {


    @Override
    public void onMessage(String message) {
        log.info("rebalance: {}", message);
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragelyByCircle());
    }
}

4.9 设置offset

消费者将从上次消费的位置开始消费消息

@Slf4j
@Component
@RocketMQMessageListener(
        topic = "${rocketmq.consumer.topic}",
        consumerGroup = "${rocketmq.consumer.group}")
public class PushMessageListener implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
    @Override
    public void onMessage(String message) {
        log.info("push {}", message);
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    }
}

4.10 Pull 消息

// 配置
rocketMQTemplate.getConsumer().subscribe(topic, "*");
rocketMQTemplate.getConsumer().setPullBatchSize(20);
List<MessageExt> messageExts = rocketMQTemplate.getConsumer().poll(1000);
log.info("poll 拉取消息:{}", messageExts);

// 类似
List<String> receive = rocketMQTemplate.receive(String.class, 1000);
log.info("poll 拉取消息:{}", receive);

五、总结

5.1 优点

  1. 稳定性高:RocketMQ 在阿里巴巴内部被广泛应用,经过多年的生产环境验证,稳定性高。
  2. 高并发:支持高并发的消息处理,可以满足大量的消息生产和消费需求。
  3. 适应性广:支持多种消息协议,如 JMS、OpenMessaging 等,并且可以很容易地与不同的系统进行集成。
  4. 高可用性:RocketMQ 支持主从和分布式部署,可以保证在任何节点宕机的情况下服务仍然可用。
  5. 高可靠性:提供了三种级别的消息传递保证,并且支持事务消息,可以保证消息的可靠传递。
  6. 支持集群:提供了完善的集群机制,可以实现高可用和负载均衡。

5.2 缺点

  1. 学习曲线较陡峭:RocketMQ 的配置和使用较为复杂,需要一定时间来学习。
  2. 运维要求较高:RocketMQ 的运维工作较为复杂,需要有专业的团队来维护。
  3. 不适合大数据处理: 相对于一些专注于大数据处理的消息中间件,如Kafka,RocketMQ在大数据处理方面的性能可能不如人意。

参考

  1. https://rocketmq.apache.org/zh/docs/4.x/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1696584.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

华为造车布局全曝光,对标奔驰、迈巴赫等

文 | Auto芯球 作者 | 雷慢 这一刻&#xff0c;我承认我格局小了&#xff0c; 就在刚刚&#xff0c;余承东曝光了华为智选车的布局计划&#xff0c; 华为问界、智界、享界等&#xff0c;将全面对标奔驰、迈巴赫、劳斯莱斯等车系&#xff0c; 这布局&#xff0c;确实是世界…

Spring MVC+mybatis 项目入门:旅游网(二) dispatcher与controller与Spring MVC

个人博客&#xff1a;Spring MVCmybatis 项目入门:旅游网&#xff08;二&#xff09;dispatcher与controller与Spring MVC | iwtss blog 先看这个&#xff01; 这是18年的文章&#xff0c;回收站里恢复的&#xff0c;现阶段看基本是没有参考意义的&#xff0c;技术老旧脱离时代…

大规模团队的数据库开发,如何用OceanBase工具快速建立企业级账号体系

前言 为了让数据库开发的安全性与可靠性得以充分保障&#xff0c;数据库开发工具的管控能力显得尤为关键。构建一个健全的账号体系&#xff0c;能够协助开发团队实现对数据库开发工具的全方位管控&#xff0c;从而有效防范各类数据安全隐患&#xff0c;确保数据库开发的顺利进…

深度神经网络——什么是混淆矩阵?

概述 混淆矩阵是一种在机器学习和数据科学中广泛使用的分析工具&#xff0c;用于评估分类模型的性能。它通过比较实际类别和模型预测的类别来提供模型性能的详细信息。以下是混淆矩阵的一些关键点&#xff1a; 结构&#xff1a;混淆矩阵是一个表格&#xff0c;通常有两行两列&…

毫米波雷达模块在智能家居安全系统中的关键技术分析

随着智能技术的不断发展&#xff0c;智能家居已经成为了现代生活的一部分。而智能家居安全系统作为智能家居的重要组成部分&#xff0c;其功能不仅仅是对家庭进行监控和报警&#xff0c;更是通过各种感知技术对家庭安全进行全方位的保障。在智能家居安全系统中&#xff0c;毫米…

Go语言的中间件(middleware)是如何实现的?

文章目录 Go语言的中间件&#xff08;Middleware&#xff09;是如何实现的&#xff1f;中间件的工作原理中间件的实现步骤示例代码总结 Go语言的中间件&#xff08;Middleware&#xff09;是如何实现的&#xff1f; 在Go语言中&#xff0c;中间件&#xff08;Middleware&#…

C语言学习笔记之指针(一)

目录 什么是指针&#xff1f; 指针和指针类型 指针的类型 指针类型的意义 指针-整数 指针的解引用 指针 - 指针 指针的关系运算 野指针 什么是野指针&#xff1f; 野指针的成因 如何规避野指针&#xff1f; 二级指针 什么是指针&#xff1f; 在介绍指针之前&#…

使用Pyecharts构建Map对象无法显示颜色--解决

我们在做数据可视化的过程中&#xff0c;可能需要使用到地图作为数据可视化的工具&#xff1b; 包括世界地图、国家地图、省市区地图等&#xff1b; 如果在你设置好颜色数据匹配后&#xff0c;可视化地图未显示对应数据的颜色&#xff0c;那么请检查是否出现以下情况&#xf…

Hadoop运行wordcount实例任务卡在job running的多种情况及解决方法

第一种&#xff1a;配置问题 这是别人的图片&#xff0c;据楼主排查解决是因为hosts配置问题… 现象&#xff1a;各种无法运行、启动 解决办法&#xff1a; 1、修改日志级别 export HADOOP_ROOT_LOGGERDEBUG,console 查看下详细信息&#xff0c;定位到具体问题解决 第二种&…

【JavaSE】/*类和对象(上)*/

目录 一、什么是类&#xff0c;什么是对象 二、类和对象的关系 三、学习类和对象的目的 四、怎样创建一个类 4.1 语法形式 4.2 创建示例 示例一&#xff1a;日期对象 示例二&#xff1a;小狗对象 示例三&#xff1a;学生对象 4.3 注意事项 4.4 修改public修饰的主类…

Day06:Flex 布局

目标&#xff1a;熟练使用 Flex 完成结构化布局 一、标准流 标准流也叫文档流&#xff0c;指的是标签在页面中默认的排布规则&#xff0c;例如&#xff1a;块元素独占一行&#xff0c;行内元素可以一行显示多个。 二、浮动 1、基本使用 作用&#xff1a;让块元素水平排列。 …

如何学习计算机网络(超详细,方法论)

分享一下学习计算机网络的方法论 首先是看视频&#xff1a; 这里我推荐中科大郑烇、杨坚全套《计算机网络&#xff08;自顶向下方法 第7版》课程 课程目标_哔哩哔哩_bilibili 教材采用神书《计算机网络&#xff08;自顶向下方法&#xff09;》&#xff0c;授课风格更偏向实…

The view model in Acise

在FreeCAD中&#xff0c;借助于Boost Signals2实现了业务层、显示层的分层&#xff0c;但整个FreeCAD Gui层却采用了Coin3D进行渲染&#xff0c;很难进行在这方面进行扩展。 相较之下&#xff0c;在SALOME中&#xff0c;可以为不同的Module指定特定的ViewModel&#xff0c;支持…

uniapp App去除iOS底部安全区域白边

未设置的情况下&#xff0c;iOS底部安全区域白边 如图&#xff1a; 去除方法&#xff1a; 在 mainfest.json 中加入一下代码&#xff1a; "safearea" : {"bottom" : {"offset" : "none"} }, 去除效果展示&#xff1a;

翻译AnyDoor: Zero-shot Object-level Image Customization

摘要 本研究介绍了AnyDoor&#xff0c;这是一款基于扩散模型的图像生成器&#xff0c;能够在用户指定的位置&#xff0c;以期望的形状将目标对象传送到新场景中。与为每个对象调整参数不同&#xff0c;我们的模型仅需训练一次&#xff0c;就能在推理阶段轻松地泛化到多样化的对…

能找伴侣的相亲婚恋平台有哪些?6款值得信赖的恋爱交友软件体验测评

在这个超快节奏的社会里&#xff0c;好多人都忙着搞事业和搞钱&#xff0c;却把终身大事给忽略了。但是随着年龄越来越大&#xff0c;来自长辈和社会的压力也越来越大&#xff0c;因此网络上的相亲交友软件&#xff0c;就成了大多数单身贵族的脱单首选了。下面就来给大家讲讲我…

C#的奇技淫巧:利用WinRM来远程操控其他服务器上的进程

前言&#xff1a;有时候远程服务器的进程你想偷偷去围观一下有哪些&#xff0c;或者对一些比较调皮的进程进行封杀&#xff0c;或者对一些自己研发的服务进行远程手动启动或者重启等&#xff0c;又不想打开远程桌面&#xff0c;只想悄咪咪地执行&#xff0c;那也许下面的文章会…

重开之数据结构(二刷)

引言: 由于前段时间学习效率不高,导致后面复习前面数据结构没有一个大纲,因此打算重新来学习以下数据结构,期望再次把数据结构学透,并有深刻的印象.并且记录每一次的学习记录 以便于后续复习 二分查找 需求:在有序数组arr内,查找target值 如果找到返回索引位置如果找不到返回…

详细分析Element中的MessageBox基本知识(附Demo)

目录 前言1. 基本知识2. Demo2.1 确认框2.2 警告框2.3 对话框 3. this.$confirm 前言 详细知识推荐阅读&#xff1a;详细分析Element Plus中的ElMessageBox弹窗用法&#xff08;附Demo及模版&#xff09; MessageBox则常用于Vue2 1. 基本知识 MessageBox 是 Element UI 提供…