根据消息ID来查找消息,consumequeue 中没有存储消息ID,如果不采取其他措施,又得遍历 commitlog文件了,indexFile就是为了解决这个问题的文件。
如图所示:
- 一个indexFile对应一个40个字节的IndexHead。
- (40,2000 0000]区间代表存放4个字节的index条目数。
- (2000 0000,4 0000 0000]区间存放的是20个字节长度的具体的Index条目数据。
- 一个IndexFile文件大小约为400M左右。
1.IndexFile
为了便于分析将DirectByteBuffer等同于HeapByteBuffer解析。
/**
*
* @param fileName
* @param hashSlotNum 500 0000
* @param indexNum 4 * 500 0000
* @param endPhyOffset
* @param endTimestamp
* @throws IOException
*/
public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,final long endPhyOffset, final
long endTimestamp) throws IOException {
// todo fileTotalSize = 40个字节的文件头 + 4字节共500w个的槽表 + 20字节2000w记录索引链表
this.fileTotalSize = IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);
// 内存映射缓冲区大小约为 400M
this.mappedFile = new DefaultMappedFile(fileName, fileTotalSize);
this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();
this.hashSlotNum = hashSlotNum;
this.indexNum = indexNum;
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
this.indexHeader = new IndexHeader(byteBuffer);
if (endPhyOffset > 0) {
this.indexHeader.setBeginPhyOffset(endPhyOffset);
this.indexHeader.setEndPhyOffset(endPhyOffset);
}
if (endTimestamp > 0) {
this.indexHeader.setBeginTimestamp(endTimestamp);
this.indexHeader.setEndTimestamp(endTimestamp);
}
}
/**
* 冲突生成的链条:
* absSlotPos:没有冲突该值为0。
* 如果存在多个冲突是如何处理的呢?
* 假设MsgKey分别为1、2、3,其对应的indexCount分别为10、20、30。并且这三个MsgKey对应的keyHash一样即出现冲突。
* 1、MsgKey = 1,即对应的(absIndexPos + 4 + 8 + 4)为0,absIndexPos为10。
* 2、MsgKey = 2,即对应的(absIndexPos + 4 + 8 + 4)为10,absIndexPos为20。
* 3、MsgKey = 3,即对应的(absIndexPos + 4 + 8 + 4)为20,absIndexPos为30。
* 注意发生冲突并没有丢弃旧MsgKey,也没有发生覆盖。
* @param key
* @param phyOffset
* @param storeTimestamp
* @return
*/
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
if (this.indexHeader.getIndexCount() < this.indexNum) {
int keyHash = indexKeyHashMethod(key);
// 根据MsgKey的hash值 计算槽位置逻辑值
int slotPos = keyHash % this.hashSlotNum;
// 计算槽位置的绝对值。
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
try {
// 如果不存在hash冲突则slotValue始终为0。否则存在hash冲突,并且 slotValue 表示 与当前MsgKey冲突的旧Msg
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
int indexCount = this.indexHeader.getIndexCount();
// 计算index条目的绝对位置
int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + indexCount * indexSize;
// MsgKey的hash值
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
//MsgKey的物理偏移量
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
//MsgKey的存储时间
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
//MsgKey的槽值通常为0。如果存在hash冲突则表示与当前MsgKey冲突的旧MsgKey的偏移量。
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
// 槽位置绝对值absSlotPos对应值的实际意义为 当前indexFile现有的条目数。其实也是定位MsgKey的偏移量
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
if (invalidIndex == slotValue) {
this.indexHeader.incHashSlotCount();
}
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
}
}
return false;
}
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
final long begin, final long end) {
if (this.mappedFile.hold()) {
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
try {
// 获取当前MsgKey的偏移量,或者当前MsgKey添加时IndexFile中现有的条目数
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount() ||
this.indexHeader.getIndexCount() <= 1) {
} else {// 存在hash冲突
for (int nextIndexToRead = slotValue; ; ) {
if (phyOffsets.size() >= maxNum) {
break;
}
// 获取当前MsgKey对应的其实position
int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + nextIndexToRead
* indexSize;
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
long timeDiff = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
// 获取到与当前MsgKey冲突的最近旧MsgKey的偏移量
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
if (timeDiff < 0) {
break;
}
timeDiff *= 1000L;
// 再根据时间维度从众多冲突的key中确定需要返回的key
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
boolean timeMatched = timeRead >= begin && timeRead <= end;
if (keyHash == keyHashRead && timeMatched) {
phyOffsets.add(phyOffsetRead);
}
if (prevIndexRead <= invalidIndex || prevIndexRead > this.indexHeader.getIndexCount() ||
prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}
nextIndexToRead = prevIndexRead;
}
}
} catch (Exception e) {
log.error("selectPhyOffset exception ", e);
} finally {
this.mappedFile.release();
}
}
}
疑问:处理冲突的时候涉及时间维度,为啥现实通过msgKey查询具体消息的时候没有时间维度的选择呢?是不是会把对应冲突的消息都给返回呢?
2.IndexHeader
IndexHeader共占用40个字节。其中包括Long类型的 起始【beginTimestampIndex】 & 结束【endTimestampIndex】时间以及起始【beginPhyoffsetIndex】 & 结束【endPhyoffsetIndex】消息物理偏移量4个指标,int类型的槽表个数【hashSlotcountIndex】以及索引个数【indexCountIndex】2个指标。