# 消息中间件 RocketMQ 高级功能和源码分析(七)

news2025/1/22 15:59:39

消息中间件 RocketMQ 高级功能和源码分析(七)

一、 消息中间件 RocketMQ 源码分析:消息存储核心类介绍

1、消息存储在 store 模块中。消息存储核心类 DefaultMessageStore.java

在这里插入图片描述

2、消息存储核心类介绍


private final MessageStoreConfig messageStoreConfig;	//消息配置属性
private final CommitLog commitLog;		//CommitLog文件存储的实现类
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;	//消息队列存储缓存表,按照消息主题分组
private final FlushConsumeQueueService flushConsumeQueueService;	//消息队列文件刷盘线程
private final CleanCommitLogService cleanCommitLogService;	//清除CommitLog文件服务
private final CleanConsumeQueueService cleanConsumeQueueService;	//清除ConsumerQueue队列文件服务
private final IndexService indexService;	//索引实现类
private final AllocateMappedFileService allocateMappedFileService;	//MappedFile分配服务
private final ReputMessageService reputMessageService;//CommitLog消息分发,根据CommitLog文件构建ConsumerQueue、IndexFile文件
private final HAService haService;	//存储HA机制
private final ScheduleMessageService scheduleMessageService;	//消息服务调度线程
private final StoreStatsService storeStatsService;	//消息存储服务
private final TransientStorePool transientStorePool;	//消息堆外内存缓存
private final BrokerStatsManager brokerStatsManager;	//Broker状态管理器
private final MessageArrivingListener messageArrivingListener;	//消息拉取长轮询模式消息达到监听器
private final BrokerConfig brokerConfig;	//Broker配置类
private StoreCheckpoint storeCheckpoint;	//文件刷盘监测点
private final LinkedList<CommitLogDispatcher> dispatcherList;	//CommitLog文件转发请求

二、 消息中间件 RocketMQ 源码分析:消息存储流程

1、消息存储流程 示例图:

在这里插入图片描述

2、 消息存储入口:DefaultMessageStore#putMessage

//判断Broker角色如果是从节点,则无需写入
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
        long value = this.printTimes.getAndIncrement();
        if ((value % 50000) == 0) {
            log.warn("message store is slave mode, so putMessage is forbidden ");
        }

    return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}

//判断当前写入状态如果是正在写入,则不能继续
if (!this.runningFlags.isWriteable()) {
        long value = this.printTimes.getAndIncrement();
    	return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
} else {
    this.printTimes.set(0);
}
//判断消息主题长度是否超过最大限制
if (msg.getTopic().length() > Byte.MAX_VALUE) {
    log.warn("putMessage message topic length too long " + msg.getTopic().length());
    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
//判断消息属性长度是否超过限制
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
    log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
    return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
//判断系统PageCache缓存去是否占用
if (this.isOSPageCacheBusy()) {
    return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}

//将消息写入CommitLog文件
PutMessageResult result = this.commitLog.putMessage(msg);

3、 代码:CommitLog#putMessage

//记录消息存储时间
msg.setStoreTimestamp(beginLockTimestamp);

//判断如果mappedFile如果为空或者已满,创建新的mappedFile文件
if (null == mappedFile || mappedFile.isFull()) {
    mappedFile = this.mappedFileQueue.getLastMappedFile(0); 
}
//如果创建失败,直接返回
if (null == mappedFile) {
    log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
    beginTimeInLock = 0;
    return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}

//写入消息到mappedFile中
result = mappedFile.appendMessage(msg, this.appendMessageCallback);

4、 代码:MappedFile#appendMessagesInner

//获得文件的写入指针
int currentPos = this.wrotePosition.get();

//如果指针大于文件大小则直接返回
if (currentPos < this.fileSize) {
    //通过writeBuffer.slice()创建一个与MappedFile共享的内存区,并设置position为当前指针
    ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
    byteBuffer.position(currentPos);
    AppendMessageResult result = null;
    if (messageExt instanceof MessageExtBrokerInner) {
       	//通过回调方法写入
        result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
    } else if (messageExt instanceof MessageExtBatch) {
        result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
    } else {
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }
    this.wrotePosition.addAndGet(result.getWroteBytes());
    this.storeTimestamp = result.getStoreTimestamp();
    return result;
}

5、 代码:CommitLog#doAppend

//文件写入位置
long wroteOffset = fileFromOffset + byteBuffer.position();
//设置消息ID
this.resetByteBuffer(hostHolder, 8);
String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);

//获得该消息在消息队列中的偏移量
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
    queueOffset = 0L;
    CommitLog.this.topicQueueTable.put(key, queueOffset);
}

//获得消息属性长度
final byte[] propertiesData =msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);

final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;

if (propertiesLength > Short.MAX_VALUE) {
    log.warn("putMessage message properties length too long. length={}", propertiesData.length);
    return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
}

//获得消息主题大小
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;

//获得消息体大小
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
//计算消息总长度
final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);

6、 代码:CommitLog#calMsgLength

protected static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) {
    final int msgLen = 4 //TOTALSIZE
        + 4 //MAGICCODE  
        + 4 //BODYCRC
        + 4 //QUEUEID
        + 4 //FLAG
        + 8 //QUEUEOFFSET
        + 8 //PHYSICALOFFSET
        + 4 //SYSFLAG
        + 8 //BORNTIMESTAMP
        + 8 //BORNHOST
        + 8 //STORETIMESTAMP
        + 8 //STOREHOSTADDRESS
        + 4 //RECONSUMETIMES
        + 8 //Prepared Transaction Offset
        + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY
        + 1 + topicLength //TOPIC
        + 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength
        + 0;
    return msgLen;
}

7、 代码:CommitLog#doAppend

//消息长度不能超过4M
if (msgLen > this.maxMessageSize) {
    CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
        + ", maxMessageSize: " + this.maxMessageSize);
    return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}

//消息是如果没有足够的存储空间则新创建CommitLog文件
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
    this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
    // 1 TOTALSIZE
    this.msgStoreItemMemory.putInt(maxBlank);
    // 2 MAGICCODE
    this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
    // 3 The remaining space may be any value
    // Here the length of the specially set maxBlank
    final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
    byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
    return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
        queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}

//将消息存储到ByteBuffer中,返回AppendMessageResult
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// Write messages to the queue buffer
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, 
                                                     msgLen, msgId,msgInner.getStoreTimestamp(), 
                                                     queueOffset, 
                                                     CommitLog.this.defaultMessageStore.now() 
                                                     -beginTimeMills);
switch (tranType) {
    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
    case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
        break;
    case MessageSysFlag.TRANSACTION_NOT_TYPE:
    case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
        //更新消息队列偏移量
        CommitLog.this.topicQueueTable.put(key, ++queueOffset);
        break;
    default:
        break;
}

8、 代码:CommitLog#putMessage

//释放锁
putMessageLock.unlock();
//刷盘
handleDiskFlush(result, putMessageResult, msg);
//执行HA主从同步
handleHA(result, putMessageResult, msg);

三、 消息中间件 RocketMQ 源码分析:消息存储文件介绍

1、消息存储文件结构图:

在这里插入图片描述

2、消息存储文件介绍:

  • commitLog:消息存储目录
  • config:运行期间一些配置信息
  • consumerqueue:消息消费队列存储目录
  • index:消息索引文件存储目录
  • abort:如果存在改文件寿命 Broker 非正常关闭
  • checkpoint:文件检查点,存储 CommitLog 文件最后一次刷盘时间戳、consumerquueue 最后一次刷盘时间,index 索引文件最后一次刷盘时间戳。

四、 消息中间件 RocketMQ 源码分析:存储文件内存映射-MappedFileQueue

1、消息 存储文件内存映射

RocketMQ 通过使用内存映射文件提高 IO 访问性能,无论是 CommitLog、ConsumerQueue 还是 IndexFile,单个文件都被设计为固定长度,如果一个文件写满以后再创建一个新文件,文件名就为该文件第一条消息对应的全局物理偏移量。

2、消息 存储文件内存映射-MappedFileQueue

在这里插入图片描述

String storePath;	//存储目录
int mappedFileSize;	// 单个文件大小
CopyOnWriteArrayList<MappedFile> mappedFiles;	//MappedFile文件集合
AllocateMappedFileService allocateMappedFileService;	//创建MapFile服务类
long flushedWhere = 0;		//当前刷盘指针
long committedWhere = 0;	//当前数据提交指针,内存中ByteBuffer当前的写指针,该值大于等于flushWhere

3、根据存储时间查询 MappedFile


public MappedFile getMappedFileByTime(final long timestamp) {
    Object[] mfs = this.copyMappedFiles(0);
	
    if (null == mfs)
        return null;
	//遍历MappedFile文件数组
    for (int i = 0; i < mfs.length; i++) {
        MappedFile mappedFile = (MappedFile) mfs[i];
        //MappedFile文件的最后修改时间大于指定时间戳则返回该文件
        if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
            return mappedFile;
        }
    }

    return (MappedFile) mfs[mfs.length - 1];
}

4、根据消息偏移量 offset 查找 MappedFile


public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
    try {
        //获得第一个MappedFile文件
        MappedFile firstMappedFile = this.getFirstMappedFile();
        //获得最后一个MappedFile文件
        MappedFile lastMappedFile = this.getLastMappedFile();
        //第一个文件和最后一个文件均不为空,则进行处理
        if (firstMappedFile != null && lastMappedFile != null) {
            if (offset < firstMappedFile.getFileFromOffset() || 
                offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
            } else {
                //获得文件索引
                int index = (int) ((offset / this.mappedFileSize) 
                                   - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
                MappedFile targetFile = null;
                try {
                    //根据索引返回目标文件
                    targetFile = this.mappedFiles.get(index);
                } catch (Exception ignored) {
                }

                if (targetFile != null && offset >= targetFile.getFileFromOffset()
                    && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
                    return targetFile;
                }

                for (MappedFile tmpMappedFile : this.mappedFiles) {
                    if (offset >= tmpMappedFile.getFileFromOffset()
                        && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
                        return tmpMappedFile;
                    }
                }
            }

            if (returnFirstOnNotFound) {
                return firstMappedFile;
            }
        }
    } catch (Exception e) {
        log.error("findMappedFileByOffset Exception", e);
    }

    return null;
}

5、获取存储文件最小偏移量


public long getMinOffset() {

    if (!this.mappedFiles.isEmpty()) {
        try {
            return this.mappedFiles.get(0).getFileFromOffset();
        } catch (IndexOutOfBoundsException e) {
            //continue;
        } catch (Exception e) {
            log.error("getMinOffset has exception.", e);
        }
    }
    return -1;
}

6、获取存储文件最大偏移量


public long getMaxOffset() {
    MappedFile mappedFile = getLastMappedFile();
    if (mappedFile != null) {
        return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
    }
    return 0;
}

7、返回存储文件当前写指针


public long getMaxWrotePosition() {
    MappedFile mappedFile = getLastMappedFile();
    if (mappedFile != null) {
        return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
    }
    return 0;
}

五、 消息中间件 RocketMQ 源码分析:存储文件内存映射-MappedFile

1、消息 存储文件内存映射-MappedFile

MappedFile 示例图:

在这里插入图片描述

2、消息 存储文件内存映射-MappedFile 文件介绍:


int OS_PAGE_SIZE = 1024 * 4;		//操作系统每页大小,默认4K
AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);	//当前JVM实例中MappedFile虚拟内存
AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);	//当前JVM实例中MappedFile对象个数
AtomicInteger wrotePosition = new AtomicInteger(0);	//当前文件的写指针
AtomicInteger committedPosition = new AtomicInteger(0);	//当前文件的提交指针
AtomicInteger flushedPosition = new AtomicInteger(0);	//刷写到磁盘指针
int fileSize;	//文件大小
FileChannel fileChannel;	//文件通道	
ByteBuffer writeBuffer = null;	//堆外内存ByteBuffer
TransientStorePool transientStorePool = null;	//堆外内存池
String fileName;	//文件名称
long fileFromOffset;	//该文件的处理偏移量
File file;	//物理文件
MappedByteBuffer mappedByteBuffer;	//物理文件对应的内存映射Buffer
volatile long storeTimestamp = 0;	//文件最后一次内容写入时间
boolean firstCreateInQueue = false;	//是否是MappedFileQueue队列中第一个文件

3、 MappedFile 初始化

  • 未开启transientStorePoolEnabletransientStorePoolEnable=truetrue表示数据先存储到堆外内存,然后通过Commit线程将数据提交到内存映射Buffer中,再通过Flush线程将内存映射Buffer中数据持久化磁盘。
private void init(final String fileName, final int fileSize) throws IOException {
    this.fileName = fileName;
    this.fileSize = fileSize;
    this.file = new File(fileName);
    this.fileFromOffset = Long.parseLong(this.file.getName());
    boolean ok = false;
	
    ensureDirOK(this.file.getParent());

    try {
        this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
        this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
        TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
        TOTAL_MAPPED_FILES.incrementAndGet();
        ok = true;
    } catch (FileNotFoundException e) {
        log.error("create file channel " + this.fileName + " Failed. ", e);
        throw e;
    } catch (IOException e) {
        log.error("map file " + this.fileName + " Failed. ", e);
        throw e;
    } finally {
        if (!ok && this.fileChannel != null) {
            this.fileChannel.close();
        }
    }
}

4、 开启transientStorePoolEnable


public void init(final String fileName, final int fileSize,
    final TransientStorePool transientStorePool) throws IOException {
    init(fileName, fileSize);
    this.writeBuffer = transientStorePool.borrowBuffer();	//初始化writeBuffer
    this.transientStorePool = transientStorePool;
}

5、 MappedFile 提交

提交数据到 FileChannel,commitLeastPages 为本次提交最小的页数,如果待提交数据不满 commitLeastPages,则不执行本次提交操作。如果 writeBuffer 如果为空,直接返回 writePosition 指针,无需执行 commit 操作,表名 commit 操作主体是 writeBuffer。


public int commit(final int commitLeastPages) {
    if (writeBuffer == null) {
        //no need to commit data to file channel, so just regard wrotePosition as committedPosition.
        return this.wrotePosition.get();
    }
    //判断是否满足提交条件
    if (this.isAbleToCommit(commitLeastPages)) {
        if (this.hold()) {
            commit0(commitLeastPages);
            this.release();
        } else {
            log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
        }
    }

    // 所有数据提交后,清空缓冲区
    if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
        this.transientStorePool.returnBuffer(writeBuffer);
        this.writeBuffer = null;
    }

    return this.committedPosition.get();
}

6、 MappedFile#isAbleToCommit

判断是否执行 commit 操作,如果文件已满返回 true;如果 commitLeastpages 大于 0,则比较 writePosition 与上一次提交的指针 commitPosition 的差值,除以 OS_PAGE_SIZE 得到当前脏页的数量,如果大于 commitLeastPages 则返回 true,如果 commitLeastpages 小于0表示只要存在脏页就提交。


protected boolean isAbleToCommit(final int commitLeastPages) {
    //已经刷盘指针
    int flush = this.committedPosition.get();
    //文件写指针
    int write = this.wrotePosition.get();
	//写满刷盘
    if (this.isFull()) {
        return true;
    }

    if (commitLeastPages > 0) {
        //文件内容达到commitLeastPages页数,则刷盘
        return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
    }

    return write > flush;
}

7、 MappedFile#commit0

具体提交的实现,首先创建 WriteBuffer 区共享缓存区,然后将新创建的 position 回退到上一次提交的位置(commitPosition),设置 limit 为 wrotePosition(当前最大有效数据指针),然后把 commitPosition 到 wrotePosition 的数据写入到 FileChannel 中,然后更新 committedPosition 指针为 wrotePosition。commit 的作用就是将 MappedFile 的 writeBuffer 中数据提交到文件通道 FileChannel 中。


protected void commit0(final int commitLeastPages) {
    //写指针
    int writePos = this.wrotePosition.get();
    //上次提交指针
    int lastCommittedPosition = this.committedPosition.get();

    if (writePos - this.committedPosition.get() > 0) {
        try {
            //复制共享内存区域
            ByteBuffer byteBuffer = writeBuffer.slice();
            //设置提交位置是上次提交位置
            byteBuffer.position(lastCommittedPosition);
            //最大提交数量
            byteBuffer.limit(writePos);
            //设置fileChannel位置为上次提交位置
            this.fileChannel.position(lastCommittedPosition);
            //将lastCommittedPosition到writePos的数据复制到FileChannel中
            this.fileChannel.write(byteBuffer);
            //重置提交位置
            this.committedPosition.set(writePos);
        } catch (Throwable e) {
            log.error("Error occurred when commit data to FileChannel.", e);
        }
    }
}

8、 MappedFile#flush

刷写磁盘,直接调用 MappedByteBuffer 或 fileChannel 的 force 方法将内存中的数据持久化到磁盘,那么 flushedPosition 应该等于 MappedByteBuffer 中的写指针;如果 writeBuffer不为空,则 flushPosition 应该等于上一次的 commit指针;因为上一次提交的数据就是进入到 MappedByteBuffer 中的数据;如果 writeBuffer 为空,数据时直接进入到 MappedByteBuffer,wrotePosition 代表的是 MappedByteBuffer 中的指针,故设置 flushPosition 为 wrotePosition。

在这里插入图片描述

public int flush(final int flushLeastPages) {
    //数据达到刷盘条件
    if (this.isAbleToFlush(flushLeastPages)) {
        //加锁,同步刷盘
        if (this.hold()) {
            //获得读指针
            int value = getReadPosition();
            try {
                //数据从writeBuffer提交数据到fileChannel再刷新到磁盘
                if (writeBuffer != null || this.fileChannel.position() != 0) {
                    this.fileChannel.force(false);
                } else {
                    //从mmap刷新数据到磁盘
                    this.mappedByteBuffer.force();
                }
            } catch (Throwable e) {
                log.error("Error occurred when force data to disk.", e);
            }
			//更新刷盘位置
            this.flushedPosition.set(value);
            this.release();
        } else {
            log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
            this.flushedPosition.set(getReadPosition());
        }
    }
    return this.getFlushedPosition();
}

9、 MappedFile#getReadPosition

获取当前文件最大可读指针。如果 writeBuffer 为空,则直接返回当前的写指针;如果 writeBuffer 不为空,则返回上一次提交的指针。在 MappedFile 设置中,只有提交了的数据(写入到 MappedByteBuffer 或 FileChannel 中的数据)才是安全的数据


public int getReadPosition() {
    //如果writeBuffer为空,刷盘的位置就是应该等于上次commit的位置,如果为空则为mmap的写指针
    return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
}

10、 MappedFile#selectMappedBuffer

查找 pos 到当前最大可读之间的数据,由于在整个写入期间都未曾改 MappedByteBuffer 的指针,如果 mappedByteBuffer.slice()方法返回的共享缓存区空间为整个 MappedFile,然后通过设置 ByteBuffer 的position 为待查找的值,读取字节长度当前可读最大长度,最终返回的 ByteBuffer的limit 为 size。整个共享缓存区的容量为(MappedFile#fileSize-pos)。故在操作 SelectMappedBufferResult 不能对包含在里面的 ByteBuffer 调用 filp 方法。


public SelectMappedBufferResult selectMappedBuffer(int pos) {
    //获得最大可读指针
    int readPosition = getReadPosition();
    //pos小于最大可读指针,并且大于0
    if (pos < readPosition && pos >= 0) {
        if (this.hold()) {
            //复制mappedByteBuffer读共享区
            ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
            //设置读指针位置
            byteBuffer.position(pos);
            //获得可读范围
            int size = readPosition - pos;
            //设置最大刻度范围
            ByteBuffer byteBufferNew = byteBuffer.slice();
            byteBufferNew.limit(size);
            return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
        }
    }

    return null;
}

11、 MappedFile#shutdown

MappedFile 文件销毁的实现方法为 public boolean destory(long intervalForcibly),intervalForcibly 表示拒绝被销毁的最大存活时间。


public void shutdown(final long intervalForcibly) {
    if (this.available) {
        //关闭MapedFile
        this.available = false;
        //设置当前关闭时间戳
        this.firstShutdownTimestamp = System.currentTimeMillis();
        //释放资源
        this.release();
    } else if (this.getRefCount() > 0) {
        if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
            this.refCount.set(-1000 - this.getRefCount());
            this.release();
        }
    }
}

六、 消息中间件 RocketMQ 源码分析:存储文件内存映射-TransientStorePool

1、消息 存储文件内存映射-TransientStorePool 介绍:

短暂的存储池。RocketMQ 单独创建一个 MappedByteBuffer 内存缓存池,用来临时存储数据,数据先写入该内存映射中,然后由 commit 线程定时将数据从该内存复制到与目标物理文件对应的内存映射中。RocketMQ 引入该机制主要的原因是提供一种内存锁定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁盘。

2、 TransientStorePool 示例图:

在这里插入图片描述

3、 TransientStorePool 代码:


private final int poolSize;		//availableBuffers个数
private final int fileSize;		//每隔ByteBuffer大小
private final Deque<ByteBuffer> availableBuffers;	//ByteBuffer容器。双端队列

4、 TransientStorePool 初始化


public void init() {
    //创建poolSize个堆外内存
    for (int i = 0; i < poolSize; i++) {
        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
        final long address = ((DirectBuffer) byteBuffer).address();
        Pointer pointer = new Pointer(address);
        //使用com.sun.jna.Library类库将该批内存锁定,避免被置换到交换区,提高存储性能
        LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));

        availableBuffers.offer(byteBuffer);
    }
}

上一节关联链接请点击:
# 消息中间件 RocketMQ 高级功能和源码分析(六)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1842443.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

汇编程序入门指南

什么是机器语言&#xff1f; 机器语言就是由二进制数字构成的程序&#xff0c;CPU 可以直接对其解释、执行。 汇编语言、C 语言、Java、BASIC 等编程语言编写的程序&#xff0c;也都需要先转换成机器语言才能被执行。机器语言有时也叫作“原生代码”&#xff08;Native Code&…

无线麦克风推荐哪些品牌,热门领夹无线麦克风哪个好,看本期文章

​在信息爆炸的今天&#xff0c;高品质的无线领夹麦克风能让声音更清晰响亮。技术发展带来多样化选择同时也带来选择困难。根据多年使用经验和行业反馈&#xff0c;我推荐一系列可靠、易用且性价比高的无线领夹麦克风&#xff0c;助你作出明智选择。还要不知道该怎么选无线领夹…

Day7—zookeeper基本操作

ZooKeeper介绍 ZooKeeper&#xff08;动物园管理员&#xff09;是一个分布式的、开源的分布式应用程序的协调服务框架&#xff0c;简称zk。ZooKeeper是Apache Hadoop 项目下的一个子项目&#xff0c;是一个树形目录服务。 ZooKeeper的主要功能 配置管理 分布式锁 集群管理…

【C++LeetCode】【热题100】两数相加【中等】-不同效率的题解【1】

题目&#xff1a; 暴力方法&#xff1a; /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullptr) {}* ListNode(int x) : val(x), next(nullptr) {}* ListNode(int x, ListNo…

模拟算法:代码世界的生活模拟器

✨✨✨学习的道路很枯燥&#xff0c;希望我们能并肩走下来! 文章目录 目录 文章目录 前言 一. 模拟算法的总结 二. 模拟算法题目 2.1 替换所有的问号 2.2 提莫攻击 2.3 Z字形变换 2.4 外观数列 2.5 数青蛙 总结 前言 本篇详细介绍了模拟算法的使用&#xff0c;让…

Word 文本框技巧2则

1 调整大小 一种方法是&#xff0c;选中文本框&#xff0c;周围出现锚点&#xff0c;然后用鼠标拖动来调整大小&#xff1b; 精确按数值调整&#xff0c;在 格式 菜单下有多个分栏&#xff0c;一般最后一个分栏是 大小 &#xff1b;在此输入高度和宽度的数值&#xff0c;来调整…

万能DIY预约小程序源码系统 适合任何行业在线预约报名 带完整的安装代码包以及搭建教程

系统概述 在当今数字化时代&#xff0c;线上预约和报名系统已经成为各行各业不可或缺的工具。为了满足市场需求&#xff0c;万能 DIY 预约小程序源码系统应运而生&#xff0c;它为各类企业和组织提供了一种便捷、高效、灵活的解决方案&#xff0c;可适用于任何行业的在线预约和…

3D营销可以应用于哪些领域?

着科技的飞速发展&#xff0c;3D营销技术正逐渐为各行各业带来前所未有的机遇与挑战&#xff0c;特别是在电商、汽车、数码家电、家居、时尚、教育、制造等领域&#xff0c;其应用愈发广泛。 1. 汽车行业 3D营销为汽车行业打破了时空的界限&#xff0c;构建了逼真的虚拟展厅。…

手把手教程 | 云端部署语音合成神器——ChatTTS

近期&#xff0c;ChatTTS 凭借其高度仿真的 AI 语音合成技术迅速走红&#xff01;ChatTTS 是专为对话场景设计的文本转语音模型&#xff0c;例如 LLM 助手对话任务&#xff0c;支持中英文两种语言。其最大的模型在超过 10 万小时的中英文数据上进行训练&#xff0c;确保了高质量…

来都来了,8个JavaScript技巧奉上

吆喝一声&#xff0c;如果你计算机、软件工程、电子等相关专业本科及以上学历&#xff0c;欢迎来共事。前后端/测试可投&#xff0c;技术大厂。 JavaScript 作为最流行的语言之一&#xff0c;其语法灵活且每年都在不断吸纳新特性&#xff0c;即使是一个从业多年的老手&#xff…

深圳信用贷款之路:申请了10次都被拒!这三步帮你逆袭银行贷款!

贷款客户最头疼的就是明明查询了一堆资料&#xff0c;贷款还是办不下来&#xff01;尤其是那些负债累累的&#xff0c;急需资金还月供和本金的朋友们&#xff0c;不是在贷款就是在贷款的路上&#xff0c;一个月申请了10次都被拒&#xff01;去了好几家贷款机构&#xff0c;费用…

手机制造计划调度场景下的复杂约束

获取更多资讯,赶快关注上面的公众号吧! 文章目录 手机制造过程大致分为SMT、板测、主板预加工、预组、组装、整测、包装等7大工段,每个工段包含一条或多条线体,根据项目要求和线体配置的差异,项目选择线体的适配度(优先级)不同,而且不同产品的工艺流程可能存在差异,共…

Hedra:让您的照片说话

在数字内容创作的世界里&#xff0c;我们总是在寻找那些能够让我们的作品更加生动和吸引人的工具。Hedra软件就是这样一款工具&#xff0c;它能够让您的照片动起来&#xff0c;甚至说话。想象一下&#xff0c;您的家庭相册中的照片突然变得栩栩如生&#xff0c;或者您的产品图片…

搭建Vue的环境

目录 # 开篇 步骤一&#xff0c;准备Vue 的环境 步骤二&#xff0c;下载Vue.js的包 步骤三&#xff0c;创建并打开写前端代码的文件夹 步骤四&#xff0c;在VSCode中引入Vue.js的包 步骤五&#xff0c;创建第一个vue.html Vue其他知识 Vue.config命令 # 开篇 介绍&…

朝阳医院2018年销售数据 数据分析与可视化

代码及数据集下载传送门 数据分析与可视化-朝阳医院2018销售数据-ipynbcsv 实践内容 以朝阳医院2018年销售数据为例&#xff0c;目的是了解朝阳医院在2018年里的销售情况&#xff0c;这就需要知道几个业务指标&#xff0c;本次的分析目标是从销售数据中分析出以下业务指标&am…

kafka的基本模型

kafka官网 线程和线程之间的数据交互 在jvm里不同的线程有自己的栈内存&#xff0c;但彼此之间交互可以在共享的内存中进行&#xff0c;即堆内存&#xff0c;堆内存会将这些消息放到队列中&#xff0c;具体实现jvm见&#xff0c;栈内存各自维护&#xff0c;堆内存大家共享 进…

【SD3辅助工具推荐】InstantX发布了三种SD3专属的ControlNet模式——Pose、Canny和Tile

InstantX 是一家专注于人工智能内容生成的独立研究机构。此前&#xff0c;曾开源著名的InstantID和论文《InstantID : Zero-shot Identity-Preserving Generation in Seconds》。随着本月12号&#xff0c;Stability AI正式开源了其产品 Stable Diffusion 3&#xff0c;这家机构…

开发板连接WiFi+开发板配置动态/静态IP

一、开发板连接WiFi 1、OTG线连接&#xff0c;使用adb进入开发板命令行。 2、使用下面指令来连接 wifi: connmanctl //进入 WIIF操作终端 connmanctl> enable wifi //使能 WIFI3、继续进入 connmanctl操作终端 connmanctl> scan wifi //开启 WIFI扫描&#xff0c;可以…

深度学习算法informer(时序预测)(三)(Encoder)

一、EncoderLayer架构如图&#xff08;不改变输入形状&#xff09; 二、ConvLayer架构如图&#xff08;输入形状中特征维度减半&#xff09; 三、Encoder整体 包括三部分 1. 多层EncoderLayer 2. 多层ConvLayer 3. 层归一化 代码如下 class AttentionLayer(nn.Module):de…

前端安全——最新:lodash原型漏洞从发现到修复全过程

前端安全——最新&#xff1a;lodash原型漏洞从发现到修复全过程 1. 漏洞复现 现在很多系统的前端都是基于vue和react框架的&#xff0c;所以就肯定少不了引入各种依赖&#xff0c;而lodash作为一款非常流行的npm库&#xff0c;每月的下载量超过8000万次。可以说是使用的十分…