RocketMQ5.0.0消息存储<二>_消息存储流程

news2024/11/26 15:42:27

目录

一、消息存储概览

二、Broker接收消息

三、消息存储流程

1. DefaultMessageStore类

2. 存储流程

        1):同步与异步存储

        2):CommitLog异步存储消息

        3):提交消息(Commit)

四、参考资料


一、消息存储概览

        如下图所示,是消息从生产者发送消息到消费者消费消息的大致流程。

  • step1:生产者发送消息到消息存储Broker端;
  • step2:单一文件Commitlog存储所有主题消息,确保顺序写入,提高吞吐量;
  • step3:消息通过堆外缓存,Commit消息写入文件内存映射,然后Flush写入磁盘;
  • step4:消息Flush磁盘后,把消息转发到ConsumeQueue、IndexFile供消费者消费;
  • step5:主题下消费队列内容相同,但是一个消费队列在同一时刻只能被一个消费者消费;
  • step6:消费者根据集群/广播模式、PUSH/PULL模式来消费消息。

        如何实现顺序存储的呢?通过org.apache.rocketmq.store.PutMessageLock接口,在消息追加文件内存映射时,加锁实现存储消息串行

        消息存储模式:同步、异步。默认异步存储,但是无论同步还是异步,最终执行存储方法是org.apache.rocketmq.store.CommitLog#asyncPutMessage(异步执行,提高存储效率),而同步需要等待存储结果才能返回。

        本章主要介绍生产者发送消息,Broker如何接收消息,如何Commit写入文件内存映射,并没有介绍如何刷盘、转发到ConsumeQueue和IndexFile、HA主从同步等内容。

二、Broker接收消息

        org.apache.rocketmq.broker.processor.SendMessageProcessor是生产者发送消息后,Broker接收消息的核心实现类。 发送消息请求码是RequestCode.SEND_MESSAGE。发送消息参考《RocketMQ5.0.0消息发送》。

        org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequest不仅处理生产者发来的消息,同时还是处理消费端消费ACK的处理请求。其核心逻辑是sendMessage或sendBatchMessage处理方法,都是Broker端存储消息。

        以下代码是org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage存储之前对消息的预处理。 核心逻辑如下:

  • randomQueueId():发送时是否指定消费队列,若没有指定,则随机选择;
  • handleRetryAndDLQ():消息是否延迟或重试消息,并处理;
  • sendTransactionPrepareMessage变量:判定是否事务消息,true事务消息;
  • 异步存储消息(默认)

                事务消息存储:TransactionalMessageServiceImpl#asyncPrepareMessage

                普通消息存储:DefaultMessageStore#asyncPutMessage

  • 同步存储消息:

                事务消息存储:TransactionalMessageServiceImpl#prepareMessage

                普通消息存储:DefaultMessageStore#putMessage

/**
 * 存储之前,对消息的处理
 * step1:预发送处理,如:检查消息、主题是否符合规范
 * step2:发送消息时,是否指定消费队列,若没有则随机选择
 * step3:消息是否进入重试或延迟队列中(重试次数失败)
 * step4:消息是否是事务消息,若是则存储为prepare消息
 * step5:BrokerConfig#asyncSendEnable是否开启异步存储,默认开启true
 *        (异步存储、同步存储)
 */
public RemotingCommand sendMessage(final ChannelHandlerContext ctx,
    final RemotingCommand request,
    final SendMessageContext sendMessageContext,
    final SendMessageRequestHeader requestHeader,
    final TopicQueueMappingContext mappingContext,
    final SendMessageCallback sendMessageCallback) throws RemotingCommandException {

    // 预发送处理,如:检查消息、主题是否符合规范
    final RemotingCommand response = preSend(ctx, request, requestHeader);
    if (response.getCode() != -1) {
        return response;
    }

    final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();

    // 获取消息体
    final byte[] body = request.getBody();

    // 发送消息时,是否指定消费队列,若没有则随机选择
    int queueIdInt = requestHeader.getQueueId();
    // 获取主题配置属性
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    if (queueIdInt < 0) { // 队列ID不符合,则在写队列随机找个
        queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
    }

    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(requestHeader.getTopic());
    msgInner.setQueueId(queueIdInt);

    // 消息扩展属性
    Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
    // 消息是否进入重试或延迟队列中(重试次数失败)
    if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig, oriProps)) {
        return response;
    }

    msgInner.setBody(body);
    msgInner.setFlag(requestHeader.getFlag());

    String uniqKey = oriProps.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
    if (uniqKey == null || uniqKey.length() <= 0) {
        uniqKey = MessageClientIDSetter.createUniqID();
        oriProps.put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, uniqKey);
    }

    MessageAccessor.setProperties(msgInner, oriProps);
    msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
    msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
    msgInner.setBornHost(ctx.channel().remoteAddress());
    msgInner.setStoreHost(this.getStoreHost());
    msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
    String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);

    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));

    // Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
    // 事务标签
    String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    boolean sendTransactionPrepareMessage = false;
    if (Boolean.parseBoolean(traFlag)
        && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
        // Broker禁止事务消息存储
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(
                "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                    + "] sending transaction message is forbidden");
            return response;
        }
        sendTransactionPrepareMessage = true;
    }

    long beginTimeMillis = this.brokerController.getMessageStore().now();

    // 消息是否异步存储
    if (brokerController.getBrokerConfig().isAsyncSendEnable()) {
        CompletableFuture<PutMessageResult> asyncPutMessageFuture;
        if (sendTransactionPrepareMessage) { // 事务prepare操作
            asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
        } else {
            asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        }

        final int finalQueueIdInt = queueIdInt;
        final MessageExtBrokerInner finalMsgInner = msgInner;
        asyncPutMessageFuture.thenAcceptAsync(putMessageResult -> {
            RemotingCommand responseFuture =
                handlePutMessageResult(putMessageResult, response, request, finalMsgInner, responseHeader, sendMessageContext,
                    ctx, finalQueueIdInt, beginTimeMillis, mappingContext);
            if (responseFuture != null) {
                doResponse(ctx, request, responseFuture);
            }
            sendMessageCallback.onComplete(sendMessageContext, response);
        }, this.brokerController.getPutMessageFutureExecutor());
        // Returns null to release the send message thread
        return null;
    }
    // 消息同步存储
    else {
        PutMessageResult putMessageResult = null;
        // 事务消息存储
        if (sendTransactionPrepareMessage) {
            putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
        } else {
            // 同步存储消息
            putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        }
        handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, beginTimeMillis, mappingContext);
        sendMessageCallback.onComplete(sendMessageContext, response);
        return response;
    }
}

三、消息存储流程

1. DefaultMessageStore类

        org.apache.rocketmq.store.DefaultMessageStore是消息存储实现类,也是存储模块最重要的一个类,其UML如下。

        其关键属性如下代码所示。同步与异步存储的方法:

  • 同步消息:单个消息putMessage()、批量消息putMessages()
  • 异步消息:单个消息asyncPutMessage()、批量消息asyncPutMessages()
// Commitlog引用次数
public final PerfCounter.Ticks perfs = new PerfCounter.Ticks(LOGGER);

// 存储配置属性
private final MessageStoreConfig messageStoreConfig;
// CommitLog(Commitlog文件存储实现类)
private final CommitLog commitLog;
// ConsumeQueue文件存储实现类
private final ConsumeQueueStore consumeQueueStore;
// 刷盘线程
private final FlushConsumeQueueService flushConsumeQueueService;
// 删除过期Commitlog文件服务
private final CleanCommitLogService cleanCommitLogService;
// 删除过期ConsumeQueue文件服务
private final CleanConsumeQueueService cleanConsumeQueueService;
// 矫正逻辑偏移量服务
private final CorrectLogicOffsetService correctLogicOffsetService;
// index文件实现类
private final IndexService indexService;
// MappedFile分配服务
private final AllocateMappedFileService allocateMappedFileService;
// 消息提交到Commitlog时消息转发,构建ConsumeQueue、index文件服务
private ReputMessageService reputMessageService;
// HA服务(主从同步服务)
private HAService haService;
// 存储状态服务
private final StoreStatsService storeStatsService;
// 堆内存缓存
private final TransientStorePool transientStorePool;

// Broker状态管理
private final BrokerStatsManager brokerStatsManager;
// 消息达到监听器(消息拉取长轮询模式)
private final MessageArrivingListener messageArrivingListener;
// Broker配置属性
private final BrokerConfig brokerConfig;
// 存储刷盘检查点
private StoreCheckpoint storeCheckpoint;
// 定时消息存储实现类
private TimerMessageStore timerMessageStore;
// 日志打印次数
private AtomicLong printTimes = new AtomicLong(0);
// Commitlog文件转发请求
private final LinkedList<CommitLogDispatcher> dispatcherList;


// 延迟消息的延迟级别
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
		new ConcurrentHashMap<Integer, Long>(32);

// 最大延迟级别
private int maxDelayLevel;

2. 存储流程

        1):同步与异步存储

        如下图所示是同步与异步存储的实现方法调用链,它们之间区别与联系:

联系:a. 同步存储实际是调用异步存储方法,即:DefaultMessageStore#asyncPutMessage;

           b. 最终执行存储方法是org.apache.rocketmq.store.CommitLog#asyncPutMessage;

区别:同步存储需要等待存储结果waitForPutResult()

        2):CommitLog异步存储消息

        org.apache.rocketmq.store.CommitLog#asyncPutMessage是异步执行存储消息。如下代码所示,关键步骤如下:

  • step1:mappedFileQueue队列中,获取可写入的Commitlog,即:从commitlog目录下获取当前写入的Commitlog;
  • step2:获取当前写入Commitlog的偏移量 = 文件偏移量 + 该文件写入位置;
  • step3:是否需要HA(主从Broker同步数据);
  • step4:CommitLog.calMsgLength获取该消息的总长度(不定长,总长度存储在前4个字节);
  • step5:加锁后(串行写),判定再次Commitlog文件没有或已被写满,则创建新的Commitlog文件;
  • step6:Commit操作,即:消息缓存或直接(是否开启堆外内存池)追加到Commitlog文件内存映射buffer,追加当前消息,并没有刷写到磁盘,则返回结果,方法:{@link DefaultMappedFile#appendMessage};
  • step7:执行同步或异步刷盘、HA主从同步复制等,方法: {@link CommitLog#handleDiskFlushAndHA}
/**
 * 执行存储消息
 * step1:mappedFileQueue队列中,获取可写入的Commitlog,即:从commitlog目录下获取当前写入的Commitlog;
 * step2:获取当前写入Commitlog的偏移量 = 文件偏移量 + 该文件写入位置;
 * step3:是否需要HA(主从Broker同步数据);
 * step4:CommitLog.calMsgLength获取该消息的总长度(不定长,总长度存储在前4个字节);
 * step5:加锁后(串行写),判定再次Commitlog文件没有或已被写满,则创建新的Commitlog文件;
 * step6:Commit操作,即:消息缓存或直接(是否开启堆外内存池)追加到Commitlog文件内存映射buffer,追加当前消息,并没有刷写到磁盘,则返回结果
 *       {@link DefaultMappedFile#appendMessage}
 * step7:执行同步或异步刷盘、HA主从同步复制等
 *       {@link CommitLog#handleDiskFlushAndHA}
 * @param msg 消息
 * @return 存储结果
 */
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    // Set the storage time
    if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
        msg.setStoreTimestamp(System.currentTimeMillis());
    }

    // Set the message body CRC (consider the most appropriate setting on the client)
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
    // Back to Results
    AppendMessageResult result = null;

    StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

    String topic = msg.getTopic();

    InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
    if (bornSocketAddress.getAddress() instanceof Inet6Address) {
        msg.setBornHostV6Flag();
    }

    InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
    if (storeSocketAddress.getAddress() instanceof Inet6Address) {
        msg.setStoreHostAddressV6Flag();
    }

    PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
    updateMaxMessageSize(putMessageThreadLocal);
    String topicQueueKey = generateKey(putMessageThreadLocal.getKeyBuilder(), msg);
    long elapsedTimeInLock = 0;

    // mappedFileQueue队列中,获取可写入的Commitlog,即:从commitlog目录下获取当前写入的Commitlog
    MappedFile unlockMappedFile = null;
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

    // 当前写入Commitlog的偏移量
    long currOffset;
    if (mappedFile == null) {
        currOffset = 0;
    } else {
        // 当前写入Commitlog的偏移量 = 文件偏移量 + 该文件写入位置
        currOffset = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
    }

    // 是否需要HA
    int needAckNums = this.defaultMessageStore.getMessageStoreConfig().getInSyncReplicas();
    boolean needHandleHA = needHandleHA(msg);

    if (needHandleHA && this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
        if (this.defaultMessageStore.getHaService().inSyncReplicasNums(currOffset) < this.defaultMessageStore.getMessageStoreConfig().getMinInSyncReplicas()) {
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
        }
        if (this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
            // -1 means all ack in SyncStateSet
            needAckNums = MixAll.ALL_ACK_IN_SYNC_STATE_SET;
        }
    } else if (needHandleHA && this.defaultMessageStore.getBrokerConfig().isEnableSlaveActingMaster()) {
        int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
                this.defaultMessageStore.getHaService().inSyncReplicasNums(currOffset));
        needAckNums = calcNeedAckNums(inSyncReplicas);
        if (needAckNums > inSyncReplicas) {
            // Tell the producer, don't have enough slaves to handle the send request
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
        }
    }

    topicQueueLock.lock(topicQueueKey);
    try {

        boolean needAssignOffset = true;
        if (defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()
                && defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
            needAssignOffset = false;
        }
        if (needAssignOffset) {
            defaultMessageStore.assignOffset(msg, getMessageNum(msg));
        }

        /*
            当前消息编码,计算byteBuf(字节缓存)长度, null则正常处理
            CommitLog.calMsgLength获取该消息的总长度(不定长,总长度存储在前4个字节)
         */
        PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
        if (encodeResult != null) {
            return CompletableFuture.completedFuture(encodeResult);
        }
        msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());
        PutMessageContext putMessageContext = new PutMessageContext(topicQueueKey);

        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
                msg.setStoreTimestamp(beginLockTimestamp); // 消息存储时间戳,确保消息存储有序
            }

            // Commitlog文件没有或已被写满,则创建新的Commitlog文件
            if (null == mappedFile || mappedFile.isFull()) {
                // 创建新的Commitlog文件
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null));
            }

            // Commitlog文件写入消息(Commitlog文件内存映射buffer,追加当前消息,并没有刷写到磁盘,则返回结果)
            result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
            switch (result.getStatus()) {
                case PUT_OK:
                    onCommitLogAppend(msg, result, mappedFile);
                    break;
                case END_OF_FILE:
                    onCommitLogAppend(msg, result, mappedFile);
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result));
                    }
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                    if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) {
                        onCommitLogAppend(msg, result, mappedFile);
                    }
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
                default:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
            }

            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
            putMessageLock.unlock();
        }
    } finally {
        topicQueueLock.unlock(topicQueueKey);
    }

    // putMessage加锁超时
    if (elapsedTimeInLock > 500) {
        log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
    }

    if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
        this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
    }

    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

    // Statistics
    storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(result.getMsgNum());
    storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());

    /*
        执行同步或异步刷盘、HA主从同步复制等
        注意:MappedFile.appendMessage只是将消息追加到Commitlog文件内存映射buffer中,并没有刷写到磁盘,则返回结果
     */
    return handleDiskFlushAndHA(putMessageResult, msg, needAckNums, needHandleHA);
}

        3):提交消息(Commit)

        org.apache.rocketmq.store.logfile.DefaultMappedFile#appendMessage是文件内存映射追加消息方法,目的是把堆外缓存池消息或直接Commit到文件内存映射,其调用链如下。

        org.apache.rocketmq.store.logfile.DefaultMappedFile#appendMessagesInner是追加消息到文件内存映射的核心方法,如下代码所示。

/**
 * Commit操作,即:消息缓存或直接(是否开启堆外内存池)追加到Commitlog文件内存映射buffer,追加当前消息,并没有刷写到磁盘,则返回结果
 * step1:当前Commitlog的写指针,判断文件是否写满;
 * step2:slice():创建与原ByteBuffer共享的内存区,拥有独立的position、limit、capacity等指针,并设置position当前写指针
 * step3:判断是否是批量消息,并追加消息到Commitlog文件内存映射buffer,并没有刷写到磁盘,则返回结果
 */
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
                                               PutMessageContext putMessageContext) {
    assert messageExt != null;
    assert cb != null;

    // 当前Commitlog的写指针
    int currentPos = WROTE_POSITION_UPDATER.get(this);

    // 未写满,则追加
    if (currentPos < this.fileSize) {
        /*
            slice():创建与原ByteBuffer共享的内存区,拥有独立的position、limit、capacity等指针
            并设置position当前写指针
         */
        ByteBuffer byteBuffer = appendMessageBuffer().slice();
        byteBuffer.position(currentPos);

        AppendMessageResult result;
        // 批量消息
        if (messageExt instanceof MessageExtBatch && !((MessageExtBatch) messageExt).isInnerBatch()) {
            // traditional batch message
            // Commitlog文件内存映射buffer,追加当前消息,并没有刷写到磁盘,则返回结果
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                    (MessageExtBatch) messageExt, putMessageContext);
        }
        // 单个消息
        else if (messageExt instanceof MessageExtBrokerInner) {
            // traditional single message or newly introduced inner-batch message
            // Commitlog文件内存映射buffer,追加当前消息,并没有刷写到磁盘,则返回结果
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                    (MessageExtBrokerInner) messageExt, putMessageContext);
        } else {
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
        WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
    log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

        需要注意的是:appendMessageBuffer().slice(),创建与原ByteBuffer共享的内存区,拥有独立的position、limit、capacity等指针,并设置position当前写指针。创建的内存追加消息

四、参考资料

Rocket Mq消息持久化_飞科-程序人生的博客-CSDN博客

百度安全验证

【RocketMQ】同一个项目中,同一个topic,可以存在多个消费者么? - N!CE波 - 博客园

RocketMQ5.0.0消息发送_爱我所爱0505的博客-CSDN博客

RocketMQ5.0.0路由中心NameServer_爱我所爱0505的博客-CSDN博客

RocketMQ5.0.0消息存储<一>_存储文件及内存映射_爱我所爱0505的博客-CSDN博客 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/333052.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【MySQL】MySQL 8.0 新特性之 - 公用表表达式(CTE)

MySQL 8.0 新特性之 - 公用表表达式&#xff08;CTE&#xff09;1. 公用表表达式&#xff08;CTE&#xff09; - WITH 介绍1.1 公用表表表达式1.1.1 什么是公用表表达式1.1.2 CTE 语法1.1.3 CTE示例1.3 递归 CTE1.3.1 递归 CTE 简介1.3.2 递归成员限制1.3.3 递归 CTE 示例1.3.4…

VSCode编译及调试NRF52

VSCode下载 下载地址 下载arm-eabi-gcc10.3.1&#xff0c;mingw64-gcc12.2.0&#xff0c;nRF5x-Command-Line-Tools_9_6_0_Installer等安装文件 链接&#xff1a;https://pan.baidu.com/s/1iuVIkd3GAiUb3qGgF-ecPg 提取码&#xff1a;2d69 安装arm-eabi-gcc10.3.1设置环境变…

接口自动化实战-postman

1.测试模型 单元测试并非测试工程师的本职工作&#xff0c;它属于开发工程师的工作&#xff0c;开发进行单元测试的情况我们不知道&#xff0c;为了确保系统尽可能没有Bug&#xff0c;于是接口测试在测试工程师这里就变得由为重要了。实际工作中为菱形模型。 接口测试能更早的…

CentOS8基础篇1:VMware 安装CentOS8 教程

一、准备工作 1.准备一台服务器 1&#xff09;下载VMware https://jssoft.bangtengxinxi.com/vmware/index360.html?source360a&unitid1643144282&unitvmware&e_creative8925351127&qhclickid06762bb36ff752c6 2.准备CentOS8 系统盘 1&#xff09;CentOS8官…

身份证号码正则表达式详解

1、结构&#xff1a; 公民身份号码是特征组合码&#xff0c;由十七位数字本体码和一位校验码组成。排列顺序从左至右依次为&#xff1a;六位数字地址码&#xff0c;八位数字出生日期码&#xff0c;三位数字顺序码和一位数字校验码。 地址码&#xff1a;&#xff08;身份证号码…

基于图的下一代入侵检测系统

青藤云安全是一家主机安全独角兽公司&#xff0c;看名字就知道当前很大一块方向专注云原生应用安全&#xff0c;目前主营的是主机万相/容器蜂巢产品&#xff0c;行业领先&#xff0c;累计支持 800万 Agent。当前公司基于 NebulaGraph 结合图技术开发的下一代实时入侵检测系统已…

[数据库]初识数据库

●&#x1f9d1;个人主页:你帅你先说. ●&#x1f4c3;欢迎点赞&#x1f44d;关注&#x1f4a1;收藏&#x1f496; ●&#x1f4d6;既选择了远方&#xff0c;便只顾风雨兼程。 ●&#x1f91f;欢迎大家有问题随时私信我&#xff01; ●&#x1f9d0;版权&#xff1a;本文由[你帅…

[C语言]offseto宏的认识与模拟实现

目录 1.offseto的认识 2.offseto的模拟实现 1.offseto的认识 在结构体中&#xff0c;因为内存对齐的存在我们需要进行计算才能知道结构体成员的地址对于结构体首地址的偏移量为多少&#xff0c;今天认识的offseto就是将结构体中成员地址对于结构体首地址的偏移量的大小所计算…

静态代理,JDK动态代理,Cglib动态代理的写法

目录静态代理JDK动态代理Cglib动态代理小结使用代理模式可以在不改变被代理类的情况下&#xff0c;实现对被代理类的进行增强。 例如&#xff1a;在被代理类中有一个div()方法&#xff0c;方法中只是计算除法&#xff0c;然而想要对其进行增强&#xff0c;如添加异常捕获等。 …

Element UI框架学习篇(一)

Element UI框架学习篇(一) 1.准备工作 1.1 下载好ElementUI所需要的文件 ElementUI官网 1.2 插件的安装 1.2.1 更改标签的时实现自动修改 1.2.2 element UI提示插件 1.3 使用ElementUI需要引入的文件 <link rel"stylesheet" href"../elementUI/element…

【JavaScript】理解面向对象以及构造函数的推导

&#x1f4bb; 【JavaScript】理解面向对象以及构造函数的推导 &#x1f3e0;专栏&#xff1a;JavaScript &#x1f440;个人主页&#xff1a;繁星学编程&#x1f341; &#x1f9d1;个人简介&#xff1a;一个不断提高自我的平凡人&#x1f680; &#x1f50a;分享方向&#xf…

面试题:Redis网络模型

1 用户空间和内核空间以Centos 7 linux操作系统为例。计算机系统被内核操控&#xff0c; 内核被应用操控。为了避免用户应用导致冲突甚至内核崩溃&#xff0c;用户应用与内核是分离的进程的寻址空间会划分为两部分:内核空间、用户空间。用户空间只能执行受限的命令(Rin3&#x…

解决Mac 安装应用提示:xx已损坏,无法打开。 您应该将它移到废纸篓问题

许多新手mac 用户安装应用得时候会出现 “已损坏&#xff0c;无法打开。您应该将它移到废纸娄” 导致无法正常安装&#xff0c;其实应用软件b并没有损坏&#xff0c;只是系统安全设置&#xff0c;我们改如何解决呢&#xff1f; 1、开启允许任何来源 苹果已经取消了允许“任何…

以数据驱动管理场景,低代码助力转型下一站

数据驱动 数据驱动&#xff0c;是通过移动互联网或者其他的相关软件为手段采集海量的数据&#xff0c;将数据进行组织形成信息&#xff0c;之后对相关的信息讲行整合和提炼&#xff0c;在数据的基础上经过训练和拟合形成自动化的决策模型&#xff0c;简单来说&#xff0c;就是…

Python项目实战——外汇牌价(附源码)

前言 几乎每个人都在使用银行卡&#xff0c;今天我们就来爬取某行外汇牌价&#xff0c;获取我们想要的数据。 环境使用 python 3.9pycharm 模块使用 requests 模块介绍 requestsrequests是一个很实用的Python HTTP客户端库&#xff0c;爬虫和测试服务器响应数据时经常会用到&…

5分钟搞懂 强缓存与协商缓存

Ⅰ、http缓存 HTTP 缓存策略 分为 > 「强制缓存」 和 「协商缓存」 为什么需要 HTTP 缓存 呢 ? &#x1f447; 直接使用缓存速度 >> 远比重新请求快 缓存对象有那些呢 &#xff1f;&#x1f447; 「图片」 「JS文件」 「CSS文件」 等等 文章目录Ⅰ、http缓存Ⅱ…

震惊!邻桌的程序猿做可视化报告竟然比我还快,带着好奇心我打开了他的电脑,发现惊天秘密,原因竟是...

其实&#xff0c;本文就是想分享一个做可视化的捷径&#xff01; 制作可视化的方式有千千万。 Excel 控若能轻车熟路驾驭 VBA&#xff0c;能玩出各种花来&#xff0c;再不济借助图表插件外援也能秒杀一众小白选 手。 会编程的&#xff0c;Echarts 几十行代码&#xff0c;分分…

Flink反压如何排查

Flink反压利用了网络传输和动态限流。Flink的任务的组成由流和算子组成&#xff0c;那么流中的数据在算子之间转换的时候&#xff0c;会放入分布式的阻塞队列中。当消费者的阻塞队列满的时候&#xff0c;则会降低生产者的处理速度。 如上图所示&#xff0c;当Task C 的数据处…

nuxt 学习笔记

这里写目录标题路由跳转NuxtLinkquery参数params参数嵌套路由tab切换效果layouts 文件夹强制约定放置所有布局文件&#xff0c;并以插槽的形式作用在页面中1.在app.vue里面2.component 组件使用Vue < component :is"">Vuex生命周期数据请求useFetchuseAsyncDat…

鸿蒙设备学习|快速上手BearPi-HM Micro开发板

系列文章目录 第一章 鸿蒙设备学习|初识BearPi-HM Micro开发板 第二章 鸿蒙设备学习|快速上手BearPi-HM Micro开发板 文章目录系列文章目录前言一、环境要求1.硬件要求2.软件要求3.Linux构建工具要求4.Windows开发工具要求5.工具下载地址二、安装编译基础环境1.安装Linux编译环…