通过这三个文件彻底搞懂rocketmq的存储原理
共 15613字,需浏览 32分钟
·
2021-10-25 18:58
RocketMQ是阿里开发的一个高性能的消息队列,支持各种消息类型,而且支持事务消息,可以说是现在的很多系统中的香饽饽了,所以呢,怎么使用大家肯定是要学习的
我们作为一个有梦想的程序员,在学习一门技术的时候,肯定是不能光知其然,这是远远不够的,我们必须要知其所以然,这样才能在面试的时候侃侃而谈,啊呸,不对,这样我们才能在工作中遇到问题的时候,理性的去思考如何解决问题
我们知道RocketMQ的架构是producer、NameServer、broker、Consumer,producer是生产消息的,NameServer是路由中心,负责服务的注册发现以及路由管理这些。
Consumer是属于消费消息的,broker则属于真正的存储消息,以及进行消息的持久化,也就是存储消息的文件和索引消息的文件都在broker上
消息队列的主要作用是解耦异步削峰,也就意味着消息队列中的存储功能是必不可少的,而随着时代的发展,业务量的增加也对消息队列的存储功能的强度的要求越来越高了
也就是说你不能光性能好,你得存储的消息也得足够支撑我的业务量,你只能存储100MB的消息,我这系统每分钟的消息业务量可能500MB了,那肯定不够使啊,那还削个啥的峰啊,峰来了你自己都顶不住
RocketMQ凭借其强大的存储能力和强大的消息索引能力,以及各种类型消息和消息的特性脱颖而出,于是乎,我们这些有梦想的程序员学习RocketMQ的存储原理也变得尤为重要
而要说起这个存储原理,则不得不说的就是RocketMQ的消息存储文件commitLog文件,消费方则是凭借着巧妙的设计Consumerqueue文件来进行高性能并且不混乱的消费,还有RocketMQ的强大的支持消息索引的特性,靠的就是indexfile索引文件
我们这篇文章就从这commitLog、Consumerqueue、indexfile这三个神秘的文件说起,搞懂这三个文件,RocketMQ的核心就被你掏空了
先上个图,写入commitLog文件时commitLog和Consumerqueue、indexfile文件三者的关系
Commitlog文件
大小和命名规则
文件存储规则和特点
简单看下源码
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
if (this.shutdown) {
log.warn("message store has shutdown, so putMessage is forbidden");
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
// 从节点不允许写入
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is slave mode, so putMessage is forbidden ");
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
// store是否允许写入
if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
} else {
this.printTimes.set(0);
}
// topic过长
if (msg.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + msg.getTopic().length());
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
// 消息附加属性过长
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
if (this.isOSPageCacheBusy()) {
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}
long beginTime = this.getSystemClock().now();
// 添加消息到commitLog
PutMessageResult result = this.commitLog.putMessage(msg);
long eclipseTime = this.getSystemClock().now() - beginTime;
if (eclipseTime > 500) {
log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
consumerQueue文件
存放位置和结构
ConsumeQueue的作用
offsetTable.offset
简单看下构建过程
public void run() {
while (!this.isStopped()) {
try {
Thread.sleep(1);
this.doReput(); // 构建ComsumerQueue
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
}
private void doReput() {
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);// 拿到所有的最新写入CommitLog的数据
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); // 一条一条的读消息
int size = dispatchRequest.getMsgSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
DefaultMessageStore.this.doDispatch(dispatchRequest); // 派发消息,进行处理,其中就包括构建ComsumerQueue
this.reputFromOffset += size;
readSize += size;
} else if (size == 0) { //
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) { // 获取消息异常
if (size > 0) {
log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
this.reputFromOffset += size;
} else {
doNext = false;
if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
this.reputFromOffset += result.getSize() - readSize;
}
}
}
}
} finally {
result.release();
}
} else {
doNext = false;
}
}
}
indexFile文件
文件结构
文件详细信息
//8位 该索引文件的第一个消息(Message)的存储时间(落盘时间)
this.byteBuffer.putLong(beginTimestampIndex, this.beginTimestamp.get());
//8位 该索引文件的最后一个消息(Message)的存储时间(落盘时间)
this.byteBuffer.putLong(endTimestampIndex, this.endTimestamp.get());
//8位 该索引文件第一个消息(Message)的在CommitLog(消息存储文件)的物理位置偏移量(可以通过该物理偏移直接获取到该消息)
this.byteBuffer.putLong(beginPhyoffsetIndex, this.beginPhyOffset.get());
//8位 该索引文件最后一个消息(Message)的在CommitLog(消息存储文件)的物理位置偏移量
this.byteBuffer.putLong(endPhyoffsetIndex, this.endPhyOffset.get());
//4位 该索引文件目前的hash slot的个数
this.byteBuffer.putInt(hashSlotcountIndex, this.hashSlotCount.get());
//4位 索引文件目前的索引个数
this.byteBuffer.putInt(indexCountIndex, this.indexCount.get());
//slot的数据存放位置 40 + keyHash %(500W)* 4
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
//Slot Table
//4字节
//记录该slot当前index,如果hash冲突(即absSlotPos一致)作为下一次该slot新增的前置index
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
//Index Linked list
//topic+message key的hash值
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
//消息在CommitLog的物理文件地址, 可以直接查询到该消息(索引的核心机制)
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
//消息的落盘时间与header里的beginTimestamp的差值(为了节省存储空间,如果直接存message的落盘时间就得8bytes)
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
//9、记录该slot上一个index
//hash冲突处理的关键之处, 相同hash值上一个消息索引的index(如果当前消息索引是该hash值的第一个索引,则prevIndex=0, 也是消息索引查找时的停止条件),每个slot位置的第一个消息的prevIndex就是0的
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
再论结构
查询流程
公式1:第n个slot在indexFile中的起始位置是这样:40+(n-1)*4 公式2:第s个index在indexFile中的起始位置是这样:40+5000000*4+(s-1)*20
包括当前机器IP+进程号+MessageClientIDSetter.class.getClassLoader()的hashCode值+消息生产时间与broker启动时间的差值+broker启动后从0开始单调自增的int值,前面三项很明显可能重复,后面两项一个是时间差,一个是重启归零,也可能重复
简单看下源码,感兴趣的下载源码去研究
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
//1. 判断该索引文件的索引数小于最大的索引数,如果>=最大索引数,IndexService就会尝试新建一个索引文件
if (this.indexHeader.getIndexCount() < this.indexNum) {
//2. 计算该message key的hash值
int keyHash = indexKeyHashMethod(key);
//3. 根据message key的hash值散列到某个hash slot里
int slotPos = keyHash % this.hashSlotNum;
//4. 计算得到该hash slot的实际文件位置Position
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
try {
//5. 根据该hash slot的实际文件位置absSlotPos得到slot里的值
//这里有两种情况:
//1). slot=0, 当前message的key是该hash值第一个消息索引
//2). slot>0, 该key hash值上一个消息索引的位置
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
//6. 数据校验及修正
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
//7. 计算当前消息索引具体的存储位置(Append模式)
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
//8. 存入该消息索引
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
//9. 关键之处:在该key hash slot处存入当前消息索引的位置,下次通过该key进行搜索时
//会找到该key hash slot -> slot value -> curIndex ->
//if(curIndex.prevIndex>0) pre index (一直循环 直至该curIndex.prevIndex==0就停止)
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
this.indexHeader.incHashSlotCount();
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
}
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ "; index max num = " + this.indexNum);
}
return false;
}
public void selectPhyOffset(final List
phyOffsets, final String key, final int maxNum, final long begin, final long end, boolean lock) {
if (this.mappedFile.hold()) {
//1. 计算该key的hash
int keyHash = indexKeyHashMethod(key);
//2. 计算该hash value 对应的hash slot位置
int slotPos = keyHash % this.hashSlotNum;
//3. 计算该hash value 对应的hash slot物理文件位置
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {
//4. 取出该hash slot 的值
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
//5. 该slot value <= 0 就代表没有该key对应的消息索引,直接结束搜索
// 该slot value > maxIndexCount 就代表该key对应的消息索引超过最大限制,数据有误,直接结束搜索
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
|| this.indexHeader.getIndexCount() <= 1) {
} else {
//6. 从当前slot value 开始搜索
for (int nextIndexToRead = slotValue; ; ) {
if (phyOffsets.size() >= maxNum) {
break;
}
//7. 找到当前slot value(也就是index count)物理文件位置
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ nextIndexToRead * indexSize;
//8. 读取消息索引数据
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
//9. 获取该消息索引的上一个消息索引index(可以看成链表的prev 指向上一个链节点的引用)
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
//10. 数据校验
if (timeDiff < 0) {
break;
}
timeDiff *= 1000L;
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
//10. 数据校验比对 hash值和落盘时间
if (keyHash == keyHashRead && timeMatched) {
phyOffsets.add(phyOffsetRead);
}
//当prevIndex <= 0 或prevIndex > maxIndexCount 或prevIndexRead == nextIndexToRead 或 timeRead < begin 停止搜索
if (prevIndexRead <= invalidIndex
|| prevIndexRead > this.indexHeader.getIndexCount()
|| prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}
nextIndexToRead = prevIndexRead;
}
}
} catch (Exception e) {
log.error("selectPhyOffset exception ", e);
} finally {
this.mappedFile.release();
}
}
}
有道无术,术可成;有术无道,止于术
欢迎大家关注Java之道公众号
好文章,我在看❤️