工具篇7--RocketMq消息模型介绍

news2025/1/12 12:28:14

文章目录

  • 前言:
  • 一、RocketMq是什么?
  • 二、RocketMq 模型介绍:
    • 1.RocketMq 模型图:
    • 2.RocketMq 生产者:
      • 2.1 生产者消费发送流程:
      • 2.2 生产者消息发送:
        • 2.2.1 同步发送普通消息:
        • 2.2.1 异步发送普通消息:
          • 2.2.1.1 定义callback 回调:
          • 2.2.1.2 消息发送时添加回调:
        • 2.2.2 发送延迟消息:
          • 2.2.2.1 同步/异步发送延迟消息:
          • 2.2.2.2 延迟消息实现原理:
        • 2.2.3 发送顺序消息:
          • 2.2.3.1 顺序消息发送:
          • 2.2.3.2 顺序消息实现原理:
        • 2.2.4 发送事务消息:
          • 2.2.4 .1 发送消息:
          • 2.2.4 .2 事务消息发送实现原理:
    • 3.RocketMq Broker 消息存储:
      • 3.1 消息存储结构:
      • 3.2 消息存储流程:
      • 3.3 每个topic 中消息读写队列的数量:
      • 3.4 消息的删除:
    • 4.RocketMq 消费者:
      • 4.1 @RocketMQMessageListener 注解消费消息:
      • 4.2 @RocketMQMessageListene 属性详解:
  • 三、总结:
  • 四、参考:


前言:

作为一个国产的消息中间件,阿里参考Kafka 开发了RocketMq 进行消息通信,它具有分布式,高可用、高吞吐、可伸缩的特性,本文对于其模型进行探究;


一、RocketMq是什么?

RocketMQ是一个非常优秀的分布式消息传递平台,能够帮助开发人员实现高性能、可靠的消息传递和流处理。它在互联网公司、金融机构和其他大型企业中广泛使用。

二、RocketMq 模型介绍:

1.RocketMq 模型图:

在这里插入图片描述
模型图中已经展示了RocketMq的几个重要组件,与kafka 不同的是这里多出了一个叫NameServer的组件;这里针对NameServer 的特点进行下说明 :

  • 对broker 进行维护,并对消息消息进行路由;
  • NameServer 需要保证高可用,每个nameserver 都保存了所有broker 的信息,所有name sever 之间是不需要进行通信的,每个broker 都和所有的namesever 保持有心跳,每个nameserver 每隔10s 也会去 排查broker 如果 broker 超过120s 仍然没有与nameserver 进行心跳连接,则认为改broker 已经挂掉,name server 进行剔除;
  • 生产者和消费者从namse server 获取到要连接的broker 信息,然后在本地进行一个缓存,然后与某一个broker 建立tcp 的长链接,进行消息的发送和接收;

2.RocketMq 生产者:

2.1 生产者消费发送流程:

生产者从NameServer 获取到Broker 信息, 创建tcp 连接,然后进行消息的发送,下面分场景使用RocketMQTemplate 模版对象进行消息的发送;

2.2 生产者消息发送:

2.2.1 同步发送普通消息:

 /** 单条
* 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息)
 * sendResult为返回的发送结果
 */
public <T> SendResult sendMsg(String topic, T msg) {
    Message<T> message = MessageBuilder.withPayload(msg).build();
    SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
    log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));
    return sendResult;
}
/**
* 同步发送批量消息
 * @param topic
 * @param msgs
 * @return
 * @param <T>
 */
public <T> SendResult sendBatchMsg(String topic, List<Object> msgs) {
    List<Message> messages = msgs.stream().map(msg -> {
        Message<T> message = (Message<T>) MessageBuilder.withPayload(msg).build();
        return message;
    }).collect(Collectors.toList());
    return sendBatchMsg(topic, null, msgs);
}

/**
 * 同步发送批量消息
 * @param topic
 * @param tag
 * @param msgs
 * @return
 * @param <T>
 */
public <T> SendResult sendBatchMsg(String topic, String tag, List<Object> msgs) {
    if (StringUtils.isNotBlank(tag)) {
        topic = topic.concat(":") + tag;
    }
    SendResult sendResult = rocketMQTemplate.syncSend(topic, msgs);
    log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));
    return sendResult;
}

2.2.1 异步发送普通消息:

2.2.1.1 定义callback 回调:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class ProducerRocketMQCallback implements SendCallback {
    @Override
    public void onSuccess(SendResult sendResult) {
        //   异步发送消息成功回调
        log.debug("Message send successfully! Message ID: {}", sendResult.getMsgId());
    }

    @Override
    public void onException(Throwable throwable) {
        // 异步发送消息失败回调
        log.error("Message send failed! Error message: {}", throwable.getMessage());
    }
}

2.2.1.2 消息发送时添加回调:
@Autowired
private ProducerRocketMQCallback callback;
/**
 * 发送异步消息
 *
 * @param topic
 * @param msgBody
 */
public void sendAsyncMsg(String topic, Object msgBody) {
    sendAsyncMsg(topic, null, msgBody, callback);
}

/**
 * 发送异步消息
 *
 * @param topic
 * @param tag
 * @param msgBody
 * @param callback
 */
public void sendAsyncMsg(String topic, String tag, Object msgBody, SendCallback callback) {
    if (StringUtils.isNotBlank(tag)) {
        topic = topic.concat(":") + tag;
    }
    rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(), callback);
}

/**
 * 发送异步消息
 *
 * @param topic        消息Topic
 * @param message      消息实体
 * @param sendCallback 回调函数
 * @param timeout      超时时间
 */
public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout) {
    rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout);
}

/**
* 发送异步消息-- 批量
*
* @param topic
* @param msgs
*/
public void sendAsyncBatchMsg(String topic, List<Object> msgs) {
   List<Message> messages = msgs.stream().map(msg -> {
       Message message = MessageBuilder.withPayload(msg).build();
       return message;
   }).collect(Collectors.toList());
   sendAsyncBatchMsg(topic, null, messages);
}
/**
* 发送异步消息--批量
*
* @param topic
* @param tag
* @param msgs
*/
public void sendAsyncBatchMsg(String topic, String tag,List<Message> msgs) {
   if (StringUtils.isNotBlank(tag)) {
       topic = topic.concat(":") + tag;
   }
   rocketMQTemplate.asyncSend(topic, msgs, callback);
}

2.2.2 发送延迟消息:

2.2.2.1 同步/异步发送延迟消息:
/**
 * 发送延时消息-- 同步发送
 * @param topic
 * @param msgBody
 * @param delayLevel
 */
public void sendDelayMsg(String topic, Object msgBody,  Integer delayLevel){
    sendDelayMsg(topic,null,msgBody,null,delayLevel);
}
/**
 * 发送延时消息
 * 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
 *
 * @param topic
 * @param tag
 * @param msgBody
 * @param timeout
 * @param delayLevel 值的有效范围1至18
 */
public void sendDelayMsg(String topic, String tag, Object msgBody, Long timeout, Integer delayLevel) {
    if (StringUtils.isNotBlank(tag)) {
        topic = topic.concat(":") + tag;
    }
    if (timeout == null) {
        timeout = messageTimeOut;
    }
    rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), timeout, delayLevel);
}

/**
 * 发送异步延迟消息
 * @param topic
 * @param msgBody
 * @param delayLevel
 */
public void asyncSendDelay(String topic, Object msgBody,  Integer delayLevel){
    asyncSendDelay(topic, MessageBuilder.withPayload(msgBody).build(),null,delayLevel);
}


/**
 * 发送异步延迟消息
 *
 * @param topic      消息Topic
 * @param message    消息实体
 * @param timeout    超时时间
 * @param delayLevel 延迟消息的级别
 */
public void asyncSendDelay(String topic, Message<?> message, Long timeout, int delayLevel) {
    if (null == timeout){
        timeout = messageTimeOut;
    }
    rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            log.info("topic:{}消息---发送MQ成功---", topic);
        }

        @Override
        public void onException(Throwable throwable) {
            log.error("topic:{}消息---发送MQ失败 ex:{}---", topic, throwable.getMessage());
        }
    }, timeout, delayLevel);
}
2.2.2.2 延迟消息实现原理:

在这里插入图片描述

  • 生产者发送消息在一个临时topic 进行存储;
  • delay server 定时任务扫描;
  • 时间到了发送到指定的topic
  • 消费者消费到消息;

2.2.3 发送顺序消息:

2.2.3.1 顺序消息发送:

提示:callback 方法为之前发送异步消息定义的回调方法

/**
 * 发送顺序消息
 *
 * @param topic   消息主题
 * @param msg     消息体
 * @param hashKey 确定消息发送到哪个队列中
 * @param <T>     消息泛型
 */
public <T> void syncSendOrderly(String topic, T msg, String hashKey) {
    Message<T> message = MessageBuilder.withPayload(msg).build();
    log.info("发送顺序消息,topic:{}, hashKey:{}", topic, hashKey);
    rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
}
/**
 * 发送顺序消息--异步
 *
 * @param topic   消息主题
 * @param msg     消息体
 * @param hashKey 确定消息发送到哪个队列中
 * @param <T>     消息泛型
 */
public <T> void asyncSendOrderly(String topic, T msg, String hashKey ) {
    Message<T> message = MessageBuilder.withPayload(msg).build();
    log.info("发送顺序消息,topic:{}, hashKey:{}", topic, hashKey);
    rocketMQTemplate.asyncSendOrderly(topic, message, hashKey,callback);
}


/**
 * 发送顺序消息
 *
 * @param topic   消息主题
 * @param msg     消息体
 * @param hashKey 确定消息发送到哪个队列中
 * @param timeout 超时时间
 */
public <T> void syncSendOrderly(String topic, T msg, String hashKey, long timeout) {
    Message<T> message = MessageBuilder.withPayload(msg).build();
    log.info("发送顺序消息,topic:{}, hashKey:{}, timeout:{}", topic, hashKey, timeout);
    rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);
}
2.2.3.2 顺序消息实现原理:

1)实现思路:
1、生产者发送消息的时候,到达Broker应该是有序的。所以对于生产者,不能使用多线程异步发送,而是单线程顺序发送;
2、写入Broker的时候,应该是顺序写入的。也就是相同主题的消息应该集中写入,选择同一个Message Queue,而不是分散写入;
3、消费者消费的时候只能有一个线程。否则由于消费的速率不同,有可能出现记录到数据库的时候无序;

2)消息发送队列的实现:
在这里插入图片描述
提示:默认队列选择同对 hashKey 的hashcode 值然后对队列的个数 取模,得到要发送的队列

3)rocketmq 实现:
通过定义hashKey(可以使用相同的hashKey) 将消息发送到同一个队列,然后同一个消费组中消费者,消费这个队列,完成顺序消费;

2.2.4 发送事务消息:

2.2.4 .1 发送消息:

1) 增加消息发送的监听:


import com.alibaba.fastjson.JSONObject;
import com.example.springrocket.dto.MessageDto;
import com.example.springrocket.dto.ResultTrancDto;
import com.example.springrocket.service.MyTranctionService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MyTransactionListener implements TransactionListener {
    @Autowired
    private MyTranctionService myTranctionService;
	 
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        //本地事务
        String msg = new String(message.getBody());
        MessageDto messageDto = JSONObject.parseObject(msg,MessageDto.class);
        try {
        	// 本地事务调用
            ResultTrancDto res =  myTranctionService.doTransac(messageDto);
            if (res.isSuccess()) {
                // 本地事务执行成功,提交半消息
                log.debug("本地事务执行成功,提交事务事务消息");
                return LocalTransactionState.COMMIT_MESSAGE;
            } else {
                // 本地事务执行失败,回滚半消息
                log.debug("本地事务执行失败,回滚事务消息");
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        } catch (Exception e) {
            // 异常情况返回未知状态
            return LocalTransactionState.UNKNOW;
        }

    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        String msg = new String(messageExt.getBody());
        MessageDto messageDto = JSONObject.parseObject(msg,MessageDto.class);
        // 本地事务核查
        if (myTranctionService.checkTransac(messageDto)){
            // 已经完成处理
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        // 数据库没有处理完毕,回滚事务
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
}

2 ) 本地事务发送和核查 demo:
2.1 )MyTranctionService.java

package com.example.springrocket.service;

import com.example.springrocket.dto.MessageDto;
import com.example.springrocket.dto.ResultTrancDto;

public interface MyTranctionService {

    ResultTrancDto doTransac( MessageDto messageDto);

    boolean checkTransac(MessageDto messageDto);
}




2.2 )MyTranctionServiceImpl .java

package com.example.springrocket.service.impl;

import com.example.springrocket.dto.MessageDto;
import com.example.springrocket.dto.ResultTrancDto;
import com.example.springrocket.service.IUserService;
import com.example.springrocket.service.MyTranctionService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MyTranctionServiceImpl implements MyTranctionService {
    @Autowired
    private IUserService userService;
    @Override
    public ResultTrancDto doTransac(MessageDto messageDto) {
        userService.saveUser(messageDto.getBody().toString());
        return new ResultTrancDto(true,"");
    }

    @Override
    public boolean checkTransac(MessageDto messageDto) {
        return  userService.checkName(messageDto.getBody().toString());
    }
}

2.3 )UserServiceImpl.java

package com.example.springrocket.service.impl;


import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.springrocket.entity.User;
import com.example.springrocket.mapper.UserMapper;
import com.example.springrocket.service.IUserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.ArrayList;
import java.util.List;

/**
 * <p>
 * 用户 服务实现类
 * </p>
 *
 * @author lgx
 * @since 2023-06-15
 */
@Slf4j
@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {
    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean saveUser(String msg) {
        for (int i=0;i<2;i++){
            User user = new User();
            user.setName(msg+"_"+i).setAge(100);
            this.save(user);
        }
        return true;
    }

    @Override
    public boolean checkName(String msg) {
        List<String> names= new ArrayList<>(1<<2);
        for (int i=0;i<2;i++){
            names.add(msg+"_"+i);
        }
        LambdaQueryWrapper<User> wrapper = new LambdaQueryWrapper();
        wrapper.in(User::getName,names);
        return this.count(wrapper) == 2;
    }
}

3 )RocketMQTemplate 增加监听:

@PostConstruct
private void configProducer(){
    ( (TransactionMQProducer)rocketMQTemplate.getProducer()).setTransactionListener(transactionListener);
}

4 )事务消息发送:

public <T> void sendTransactionMessage(String topic, T msg) {
    Message<T> message = MessageBuilder.withPayload(msg).build();
    // 发送事务消息
    rocketMQTemplate.sendMessageInTransaction(topic, message,"producuer-01");
}
2.2.4 .2 事务消息发送实现原理:

在这里插入图片描述

  • 调用sendMessageInTransaction 方法进行消息的发送;
  • 发送完成,进入监听器executeLocalTransaction 方法,这个方法中执行本地事务;
  • 本地事务执行成功,则提交事务,失败则回滚事务,异常则发送unknown;
  • broker 根据提交的事务类型选择,投递,删除消息,如果是unknown 则向客户端发起查询;
  • 如果是unknown 进入客户端的checkLocalTransaction 方法进行事务的回查,然后提交或者回滚事务;

3.RocketMq Broker 消息存储:

3.1 消息存储结构:

1) topic 下的队列:
RocketMq 中的消息都存储与topic 中,topic是逻辑存储,每个topic下有多个队列,topic 主题在逻辑上是队列的集合;队列是 Apache RocketMQ 消息的最小存储单位;
2 )队列可以定义的属性:

  • 读写。消息可以写入当前队列,也可以从当前队列中读取。

  • 只读。可以从当前队列读取消息,但不能写入当前队列。

  • 只写。消息可以写入当前队列,但不能从当前队列中读取。

  • 读取或写入状态不可用。当前队列不允许读取或写入操作。

3)每个队列的结构:
在这里插入图片描述
每个队列对维护了存储消息的CommitLog文件,以及消费者对于读队列消费的偏移量,并维护了消息与磁盘之间的索引关系;

3.1)写队列:
多个消息存储(Message Store File)组成,每个消息存储文件中包含多条消息。一个消息存储文件通常包括四个部分:CommitLog、ConsumeQueue、IndexFile 和 HashSlotFile。具体组成如下:

CommitLog:CommitLog 是写队列中最重要的组成部分,也是消息存储文件中占用空间最大的部分。是一个只写的追文件,用于存储 Producer 发送的消息。新增的消息会被追加到文件末尾,CommitLog 中每条消息在写入时都会有唯一的 offset,Consumer 在消费时以 offset 为基准来寻找消息。CommitLog 的写入是顺序写,相对于随机写入来说速度更快。

ConsumeQueue:ConsumeQueue 用于消费者消费消息时的快速定位,是写队列的消息索引文件。MQ会根据 ConsumeQueue 中消息的 offset 信息快速定位到 CommitLog 中存储对应消息的位置,从而快速拉取到需要的消息。ConsumeQueue 是一个只读的文件,消息在消费时不会被删除,只会打上标记表示已被消费。RocketMQ 为每个 Topic 和 Consumer Group 创建一个 ConsumeQueue。

IndexFile:IndexFile 也是写队列的消息索引文件,用于支持随机读取消息。当 Consumer 并非按照顺序消费消息时,可以从 IndexFile 中查找 offset 等信息,找到所需的消息进行消费。IndexFile 是一个只读的文件,它记录了每个 ConsumeQueue 文件中第一条消息的 offset 以及对应的物理位置,方便 Consumer 在指定 offset 位置开始消费消息。

HashSlotFile:HashSlotFile 是 ConsumeQueue 和 IndexFile 的辅助文件,用于实现消息的定位和检索。HashSlotFile 中存储了 Topic 中所有消息的 Topic 名称和消息 ID,它把消息的 Topic 名称和消息 ID 作为 Key 进行 Hash 计算,将其分配到不同的 Hash Slot 中。每个 Hash Slot 对应一个 ConsumeQueue 文件和一个 IndexFile 文件,这样就可以快速地根据 Hash Slot 查找到对应的 ConsumeQueue 文件和 IndexFile 文件,从而实现快速定位和检索消息。

综上所述,一个写队列内部由多个消息存储文件组成,主要包括 CommitLog、ConsumeQueue、IndexFile 和 HashSlotFile 四个部分,它们协同工作,实现消息的存储、索引、定位和检索等功能。

3.2)读队列:
在 RocketMQ 中,读队列中的消息存储(Message Store File)也由多个文件组成,主要包括 ConsumeQueue、IndexFile 和 CommitLog 三个部分。

ConsumeQueue:ConsumeQueue 也是读队列的核心组成部分,它用于记录读取者已经消费的消息位置,方便重复消费以及快速定位下一条未消费的消息。ConsumeQueue 在读队列中的组成与写队列中的有所不同,它不再以文件的方式存储,而是以 Map 集合的方式存储在内存中。

IndexFile:IndexFile 也是读队列中的组成部分,用于支持快速定位和检索消息。IndexFile 中记录了消息的 offset、物理位置以及消息的存储时间等信息,可以根据 offset 或时间戳快速定位到一条消息的位置。

CommitLog:CommitLog 也是读队列中的组成部分,它存储了消息的实际内容。当消费者找到一条消息的 offset 后,可以通过 IndexFile 找到消息的物理位置,在 CommitLog 中查找到消息的实际内容并进行消费。

综上所述,一个读队列内部由 ConsumeQueue、IndexFile 和 CommitLog 三个部分组成。ConsumeQueue 用于记录读者已经消费的消息位置,IndexFile 用于快速定位和检索消息,CommitLog 则是消息实体的存储位置。通过这三个部分的协同工作,消费者可以快速地读取并消费消息,实现消息的高效传递和处理。

3.2 消息存储流程:

  • Producer 发送一条消息到达broker 中,根据消息队列属性,将消息存放到某个队列中;
  • 将该条消息追加到该队列CommitLog 文件(CommitLog 默认会以1G 大小进行分隔);
  • 维护消息与磁盘关系的索引到IndexFile 中;
  • 写队列中的消息同步到对应的读队列中,方便后续消费者从读队列获取消息;

3.3 每个topic 中消息读写队列的数量:

写队列的数量相当于分区的数量,默认8个,客户端发送消息创建默认4个,并且每个broker 中 都有这个队列数量的存储目录,队列的数量= 写队列的数量* broker 数量;
当写队列的个数大于读队列的个数时,会出现某些读队列上的消息无法被及时消费的情况,导致消息积压。因为当 Producer 发送消息时,RocketMQ 会将消息平均地分配到所有的可用写队列中,如果写队列的个数大于读队列的个数,那么某些读队列上的消息就无法及时被消费掉。另外,当读队列上有消息堆积时,可能会影响系统的性能和稳定性。

当写队列的个数小于读队列的个数时,则可能会导致某些队列空闲的情况,无法充分利用资源。因为如果写队列的个数小于读队列的个数,那么某些读队列可能会一直没有消息可消费,导致消费者无事可做,浪费系统资源。

因此,为了实现最佳性能和资源利用率,通常建议将读队列和写队列的个数尽可能地设置成同样的数量。在实际应用中,可以通过调整 Topic 和 Message Queue 的参数,来动态地增加或减少队列数量,以达到最佳的消息传递和处理效果。

3.4 消息的删除:

RocketMQ Broker 端支持的消息删除策略主要有三种:

1.) 定时删除策略(Time-based deletion strategy):在 Broker 的配置文件中设置消息保存时间(默认为 72 小时),超过这个时间的未被消费的消息将被删除。这种策略适用于对消费实时性没有特别的要求的场景,可以让消息自然过期删除。

2.) 空间删除策略(Space-based deletion strategy):在 Broker 的配置文件中设置一定的磁盘空间大小阈值,当磁盘空间使用超过阈值时,Broker 会删除一部分相对“冷门”的消息。这种策略适用于对消费实时性及消息可靠性要求高的场景,可以减少磁盘占用率和提高消息的重要性,默认磁盘使用率超过90% 直接拒绝消息写入。

3.) 主动删除策略(Manually deletion strategy):通过手动指定消息 ID 或时间戳删除特定消息。这种方式适用于需要精细控制消息存储和删除的场景,可以通过代码或 RocketMQ 控制台等方式实现。

需要注意的是,无论采用哪种消息删除策略,在删除消息时都需要确保消息已经被消费或不再需要消费,避免出现消息丢失或错误的情况。另外,为了提高消息存储和删除的效率,也可以使用消息索引、批量删除等技术手段来减少操作次数和提高操作效率。

4.RocketMq 消费者:

4.1 @RocketMQMessageListener 注解消费消息:

1) 自动提交ack:

@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "consumer_test",
        topic = "test1", consumeMode = ConsumeMode.CONCURRENTLY, messageModel = MessageModel.CLUSTERING,
        selectorExpression = "*")
public class RocketMqConsumerTest implements RocketMQListener<MessageExt> {
	 @Override
    public void onMessage(MessageExt message) {
        byte[] body = message.getBody();
        String msg = new String(body);
        log.debug("监听到消息:message:{}", msg);
        // 手动提交 ACK
        ConsumeConcurrentlyContext context = RocketMQUtil.getLocalContext();
        if (context != null) {
            List<MessageExt> msgs = context.getMsgs();
            rocketMQUtil.ack(msgs);
        }
    }
}

默认情况下,RocketMQ 自动提交 Ack 是在业务代码执行完成之后提交的。也就是说,只有当 consumeMessage 方法成功返回后,才会自动提交 Ack,表示这个消息已经被成功消费。如果业务代码执行过程中出现了异常,消息将不会被标记为已消费,被重新投递给 Broker,等待下次消费。

2) 手动提交ack:

@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "consumer_test",
        topic = "test1", consumeMode = ConsumeMode.CONCURRENTLY, messageModel = MessageModel.CLUSTERING,
        selectorExpression = "*")
public class RocketMqConsumerTest implements MessageListenerConcurrently {
	@Override
	public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
	
	 for (int i = 0; i < msgs.size(); i++) {
	      try {
	          // 打印消息内容
	          String msgStr = new String(msgs.get(i).getBody());
	          log.debug("监听到消息:message:{}", msgStr);
	
	      } catch (Exception e) {
	          e.printStackTrace();
	          // 手动提交 Ack 时出错,进行消息重试
	          context.setAckIndex(i--);
	          return ConsumeConcurrentlyStatus.RECONSUME_LATER;
	      }
	  }
	
	  // 消费成功,返回 ACK
	  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
	}
}

context对象的成员变量ackIndex的含义是:最后一个正常消费的消息索引号。索引号的最大值为传入的消息数量-1。通过context.setAckIndex(n),可以说明哪个索引号之后的消息消费失败。 这样从索引ackIndex往后的消息会被重新发送至broker,等待下一次消费。 然后从ProcessQueue中移除消费过的消息。 最后更新最新的offset至RemoteBrokerOffsetStore。

4.2 @RocketMQMessageListene 属性详解:

1.) consumerGroup:消费者组名,必填项。该值需要在同一个应用程序中唯一,用于区分不同的消费者。

2.)topic:待消费的消息主题,必填项。

3.)selectorExpression:消息过滤表达式,可选。如果指定了该属性,RocketMQ 服务器会只将符合该表达式的消息推送给消费者,其他消息会被过滤掉。如果不指定该属性,则表示消费所有消息。

4.) messageModel:消息模式,可选。支持两种模式:集群模式(默认值)和广播模式。集群模式下,同一个消费者组的多个消费者分摊消费消息;而广播模式下,同一个消费者组的多个消费者都会收到消息。

5.) consumeMode:消费模式,可选。共有三种模式:顺序消费、并发消费和异步阻塞消费。默认值是 CONCURRENTLY,表示并发消费。顺序消费模式会确保消费者按照发送顺序依次消费消息,适用于有序消息场景。异步阻塞消费模式则意味着消费者线程会被阻塞,直到消息处理完毕才会继续消费。

6.) consumeThreadMax:最大消费线程数,可选。默认值是 64,表示同一时刻最多有 64 条消息能够被消费。如果需要扩大消费能力,可以适当增大该属性的值。

7.) pullThresholdForQueue:消息拉取阈值,可选。默认值是 1000,表示每个消费者拉取一条消息之后,会立即再拉取 999 条消息保存在本地缓存中。当本地缓存中的消息处理完毕之后,消费者会再次拉取 1000 条消息。如果待消费的消息比较大,可以适当调大该属性的值,减少拉取消息的次数。

8.) pullInterval:拉取消息间隔时间,可选。默认值是 0,表示拉取消息之前不需要等待。如果待消费的消息比较少,可以把该属性值调大,节省网络开销,减轻 RocketMQ 服务器的压力。

9.) maxReconsumeTimes:消息重试次数上限,可选。默认值是 16,表示当某条消息消费失败时,自动进行重试,最多重试 16 次。如果达到重试次数上限还是无法消费该条消息,则将该消息保存在死信队列中。可以通过修改该属性来设置不同的重试机制。

10.) delayLevelWhenNextConsume:下一次消息消费的延时等级,可选。默认值是 0,表示消息不需要延时再次消费。如果待消费的消息经过一段时间仍然不能成功处理,则可以设置该属性来减少 RocketMQ 服务器的压力。延时等级越高,表示下一次消息消费的时间越晚。比如,delayLevelWhenNextConsume=5,表示下一次消息消费时间会晚上 5 分钟。

11.) accessChannel:接入通道,默认为本地通道(InProcess)。目前,RocketMQ 还支持远程通道(Cloud),将消息发送到云上的 RocketMQ 集群中。

12.) messageModel:消息分发模式,可选项,支持两种消息分发模式:集群模式和广播模式,默认为集群模式。

13.) enableVIPChannel:是否启用 VIP 通道,可选项,默认为 true,将会启用 VIP 通道来保

三、总结:

以上就是今天要讲的内容,本文介绍了RocketMq消息模型中组成的组件,并对每个组件单独进行了介绍和使用;具体springboot 项目中的使用可以参考:springboot 整合Rocketmq。

四、参考:

RocketMQ 官网文档;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/690210.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI大模型及算力要求

AI大模型对算力的要求非常高&#xff0c;需要高性能的硬件设备和分布式训练技术来支持。随着AI技术的不断发展&#xff0c;未来可能会出现更大、更复杂的模型&#xff0c;对算力的要求也将更高。今天和大家分享几个大模型及算力要求&#xff0c;希望对大家有所帮助。北京木奇移…

DETR系列:RT-DETR实战部署

上篇文章介绍RT-detr的论文内容&#xff08;RT-DETR 论文解析&#xff09;&#xff0c;本篇文章介绍算法复现、tensorRT加速、python调用部署、训练等方法。 RT-DETR实战部署 1.复现模型详情2.环境准备3.训练4.部署5.测试 1.复现模型详情 本次复现主要测试下表中RT-DETR-R50和…

Kafka集群模式核心概念

文章目录 1.Kafka集群模式下Broker|主题|分区|副本的概念1.1.Broker|主题|分区|副本的概念1.2.创建一个Topic指定3个副本数1.3.多副本的Topic详细信息描述 2.集群模式下以消费者组消费Topic中各分区消息的概念2.1.分消费者组消费各分区的概念2.2.集群模式下消息的发送和消费 3.…

Go 语言中 Context 的作用和使用方法详解

KDP&#xff08;数据服务平台&#xff09;是一款由 KaiwuDB 独立自主研发的数据服务产品&#xff0c;以 KaiwuDB 为核心&#xff0c;面向 AIoT 场景打造的一站式数据服务平台&#xff0c;满足工业物联网、数字能源、车联网、智慧产业等行业核心业务场景下数据采集、处理、计算、…

在Azure SQL DB/Azure托管实例里快速查询各数据库大小以及每个数据库下表的大小

目录 &#xff08;一&#xff09;前言 &#xff08;二&#xff09;正文 1. 环境&#xff1a; 2. 查看实例下每个数据库的空间大小 &#xff08;1&#xff09; SQL语法 &#xff08;2&#xff09;运行结果 3. 查看特定数据库下每张表的大小 &#xff08;1&#xff09;SQ…

一个sql中的一张表,最多只会走一个索引吗

目录 先给结论 做实验 1.根据时间范围查询 什么是key_len&#xff1f; 2.根据时间范围和 is_delete 查询 最左匹配原则 2.根据时间范围和 blog_type 查询 如果加上id会怎么样 并不是索引一定会走 1.IN子表数量过多 2.单次查询超过30% 先给结论 先说结论&#xff0c;…

设计模式第14讲——享元模式(Flyweight)

目录 一、什么是享元模式 二、角色组成 三、优缺点 四、应用场景 4.1 生活场景 4.2 java场景 五、代码实现 5.0 代码结构 5.1 Bike——抽象享元类&#xff08;FlyWeight&#xff09; 5.2 具体享元类&#xff08;ConcreteFlyWeight&#xff09; 5.3 BikeFactory——享元…

layui框架学习(28:穿梭框模块)

Layui模块中的穿梭框模块transfer主要支撑穿梭框组件的显示、交互等操作。所谓穿梭框是指左右各有一个复选框列表&#xff0c;可以将左侧选中的项目移动到右边&#xff0c;后者将右侧的选中项移回左边的控件&#xff0c;其样式类似下图所示&#xff08;参考文献5-6&#xff09;…

TI AM62x工业开发板规格书(单/双/四核ARM Cortex-A53 + 单核ARM Cortex-M4F,主频1.4GHz)

1 评估板简介 创龙科技TL62x-EVM是一款基于TI Sitara系列AM62x单/双/四核ARM Cortex-A53 单核ARM Cortex-M4F多核处理器设计的高性能低功耗工业评估板&#xff0c;由核心板和评估底板组成。处理器ARM Cortex-A53(64-bit)主处理单元主频高达1.4GHz&#xff0c;ARM Cortex-M4F实…

如何使用 Flink SQL 探索 GitHub 数据集|Flink-Learning 实战营

为进一步帮助开发者学习使用 Flink&#xff0c;Apache Flink 中文社区近期发起 Flink-Learning 实战营项目。本次实战营通过真实有趣的实战场景帮助开发者实操体验 Flink&#xff0c;课程包括实时数据接入、实时数据分析、实时数据应用的场景实。并结合小松鼠助教模式&#xff…

USR-C216 WIIF连接手机

复位后连接USR-C216无线 浏览器输入10.10.100.254 账号密码为admin 客户端模式服务器地址无效&#xff0c;默认就行 打开手机网络调试助手选择客户端模式&#xff0c;输入10.10.100.254&#xff0c;端口8899 可以透传了 关于AT指令&#xff0c;先发“”&#xff0c;然后3s内发…

【数据管理架构】什么是 OLTP?

OLTP&#xff08;在线事务处理&#xff09;支持在 ATM 和在线银行、收银机和电子商务以及我们每天与之交互的许多其他服务背后进行快速、准确的数据处理。 什么是 OLTP&#xff1f; OLTP 或在线事务处理允许大量人员&#xff08;通常通过 Internet&#xff09;实时执行大量数据…

基于Vue+Node.js的宠物领养网站的设计与开发-计算机毕设 附源码83352

基于VueNode.js的宠物领养网站的 摘 要 随着互联网大趋势的到来&#xff0c;社会的方方面面&#xff0c;各行各业都在考虑利用互联网作为媒介将自己的信息更及时有效地推广出去&#xff0c;而其中最好的方式就是建立网络管理系统&#xff0c;并对其进行信息管理。由于现在网络…

【国产FPGA应用】紫光Pango Design联合 Modelsim 仿真方法

Modelsim 是 FPGA 开发中重要的 EDA 设计仿真工具&#xff0c;主要用于验证数字电路设计是否正确。我们经常用Xilinx的ISE或者Vivado与Modelsim进行联合仿真&#xff0c;其实国产FPGA开发工具也可以与Modelsim进行联合仿真&#xff0c;对于设计比较复杂的应用还是非常方便的&am…

创邻科技与浪潮信息KOS完成澎湃技术认证

近日&#xff0c;浙江创邻科技有限公司&#xff08;简称&#xff1a;创邻科技&#xff09;自主研发的Galaxybase图数据库系统与浪潮信息服务器操作系统KOS V5完成澎湃技术认证。创邻科技作为国内首个成熟的商业图数据库供应商&#xff0c;在同类厂商中率先完成认证。测试结果显…

vue3通过render函数实现一个菜单下拉框

背景说明 鼠标移动到产品服务上时&#xff0c;出现标红的下拉框。 使用纯css的方案实现最简单&#xff0c;但是没什么技术含量&#xff0c;弃之&#xff1b;使用第三方组件库&#xff0c;样式定制麻烦弃之。因此&#xff0c;我们使用vue3直接在页面创建一个dom作为下拉框吧。…

【经验分享】Docker容器部署方法说明

前 言 本案例适用开发环境&#xff1a; Windows开发环境&#xff1a;Windows 7 64bit、Windows 10 64bit Linux开发环境&#xff1a;Ubuntu 18.04.4 64bit 虚拟机&#xff1a;VMware15.1.0 Docker是一个开源的应用容器引擎&#xff0c;让开发者可打包他们的应用以及依赖包…

rust持续学习 声明宏

学习记录&#xff0c;都是学自圣经&#xff0c;macrobook啥的 https://doc.rust-lang.org/reference/macros-by-example.html macro_rules! bar {(3) > {println!("3");};(4) > {println!("4");}; }这个是入门例子&#xff0c;有点像match 调用就是…

【Java|多线程与高并发】线程池详解

文章目录 1. 线程池简介2. 创建线程池3. 工厂模式简介4. 线程池的使用5. 实现线程池6. ThreadPoolExecutor的构造方法讲解7. 线程池的线程数量,如何确定? 1. 线程池简介 Java线程池是一种用于管理和重用线程的机制&#xff0c;它可以在需要执行任务时&#xff0c;从线程池中获…

二叉树遍历方法——前、中、后序遍历(java)

二叉树结构&#xff1a; static class TreeNode{public char val;public TreeNode left;public TreeNode right;public TreeNode(char val) {this.val val;}Overridepublic String toString() {return this.val"";}} 一、前序遍历 前序遍历是一种访问二叉树的每一…