浅析RocketMQ-MappedFileQueue和MappedFile
RocketMQ的Commit,Comsumequeue,Index文件的代码实现都是MappedFile,而MappedFileQueue则持有了多个MappedFile,可以理解为对应的文件夹。本文主要分析下其重要的方法。
一.创建MappedFile
RocketMQ要向MappedFile中写入数据时,会调用getLastMappedFile获取最新的写入文件
1.getLastMappedFile
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {long createOffset = -1;// 获取MappedFile集合中末尾的文件,对应的MappedFileQueue的mappedFilesMappedFile mappedFileLast = getLastMappedFile();// 1.MappedFile不存在,计算新文件初始偏移量,注意这里不为0if (mappedFileLast == null) {createOffset = startOffset - (startOffset % this.mappedFileSize);}// MappedFile存在但是无法写入了,计算新文件初始偏移量if (mappedFileLast != null && mappedFileLast.isFull()) {createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;}// 创建新文件if (createOffset != -1 && needCreate) {return tryCreateMappedFile(createOffset);}return mappedFileLast;}
注释1处,计算新文件偏移量为啥直接不为0呢?
因为MappedFile文件可能被删除了,故需要重新计算
protected MappedFile tryCreateMappedFile(long createOffset) {// 拼接文件路径,这里获取了下个文件路径和下下的文件路径String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset+ this.mappedFileSize);return doCreateMappedFile(nextFilePath, nextNextFilePath);}
protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {MappedFile mappedFile = null;// 正常是allocateMappedFileService在初始化时会创建if (this.allocateMappedFileService != null) {mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,nextNextFilePath, this.mappedFileSize);} else {try {// 否则直接构建文件mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);} catch (IOException e) {}}if (mappedFile != null) {// 设置首次创建标识if (this.mappedFiles.isEmpty()) {mappedFile.setFirstCreateInQueue(true);}// 持有新的mappedFilethis.mappedFiles.add(mappedFile);}return mappedFile;}
2.putRequestAndReturnMappedFile
这里有个transientStorePoolEnable参数,等于true时,开启堆外内存配置,表示消息存储时会先存在堆外内存,然后通过Commit线程将数据提交到内存映射的Buffer中,最后通过Flush线程将数据持久化到磁盘中。
putRequestAndReturnMappedFile会生成下个文件和下下个文件。但提前生成的下下个文件不会返回,留到下次调用直接返回。这样做到每次快人一步,提升了运行效率。
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {int canSubmitRequests = 2;// 开启了transientStorePoolEnable,默认为falseif (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {// 开启了fastFailIfNoBufferInStorePool配置,默认为falseif (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()// 要求是主节点&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) {// 分配的堆外空间是有限数量的canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();}}// 生成下个文件,包装一下成一个特定请求AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;if (nextPutOK) {if (canSubmitRequests <= 0) {this.requestTable.remove(nextFilePath);return null;}// requestQueue是一个堵塞队列 // 初始化时AllocateMappedFileService会调用一个线程,不断读取requestQueue数据,生成具体的文件boolean offerOK = this.requestQueue.offer(nextReq);canSubmitRequests--;}// 生成下下个文件,跟生成生成下个文件逻辑一致AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;if (nextNextPutOK) {if (canSubmitRequests <= 0) {this.requestTable.remove(nextNextFilePath);} else {boolean offerOK = this.requestQueue.offer(nextNextReq);}}// 具体生成文件过程有误,这里拦截方法返回if (hasException) {return null;}// 只获取生成的下个文件结果,下下个文件让线程慢慢运行AllocateRequest result = this.requestTable.get(nextFilePath);try {if (result != null) {boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);if (!waitOK) {return null;} else {this.requestTable.remove(nextFilePath);return result.getMappedFile();}}} catch (InterruptedException e) {}return null;}
AllocateMappedFileService 调度运行的线程,执行mmapOperation进行创建文件
private boolean mmapOperation() {boolean isSuccess = false;AllocateRequest req = null;try {// 获取创建文件的请求,获取不到则堵塞住req = this.requestQueue.take();AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());// 请求对象发生变动,直接返回if (null == expectedRequest) {return true;}if (expectedRequest != req) {return true;}// 这里不为null说明,已经提前生成了,对应了生成下下个文件的逻辑if (req.getMappedFile() == null) {long beginTime = System.currentTimeMillis();MappedFile mappedFile;// 开启了堆外内存if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {try {mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();// 这里比直接构造,多了一个将堆外内存分配给writeBuffer的操作mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());} catch (RuntimeException e) {mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());}} else {// 直接构建对象mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());}// 文件预热操作if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog()&&this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {// getFlushDiskType为异步刷盘,getFlushLeastPagesWhenWarmMapedFile为4kmappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());}req.setMappedFile(mappedFile);this.hasException = false;isSuccess = true;}} ... // 省略异常捕获处理,主要设置hasException=true} finally {// 对应putRequestAndReturnMappedFile的getCountDownLatch.await操作if (req != null && isSuccess)req.getCountDownLatch().countDown();}return true;}
3. warmMappedFile
为啥要进行文件预热?
我们知道RocketMQ使用了内存映射技术mmap,它将文件在磁盘位置的地址和的虚拟地址通过映射对应起来。但是操作系统并没有加载到物理内存中。文件预热可以理解为将数据加载到物理内存的操作。
public void warmMappedFile(FlushDiskType type, int pages) {// 创建一个共享的缓存区ByteBuffer byteBuffer = this.mappedByteBuffer.slice();int flush = 0;// MappedFile.OS_PAGE_SIZE= 4kfor (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {// 1.每隔4k,向缓存区写入一个字节byteBuffer.put(i, (byte) 0);// 同步刷盘if (type == FlushDiskType.SYNC_FLUSH) {// 每隔16K,强制刷盘一次,pages=4k,OS_PAGE_SIZE为=4kif ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {flush = i;mappedByteBuffer.force();}}// 这里是让出CPU,避免长时间占用if (j % 1000 == 0) {try {Thread.sleep(0);} catch (InterruptedException e) {}}}// 强制刷盘if (type == FlushDiskType.SYNC_FLUSH) {mappedByteBuffer.force();}// 将进程使用的部分或全部的地址空间锁定在物理内存中,防止其被交换到swap空间this.mlock();}
注释1 为啥每次写入是4k?
这跟Page Cache 页缓存有关,每一页大小约为4k。系统每次读写数据会先到Page Cache,再到硬盘,如果请求过来,在页缓存上没对应的数据,则会发生缺页中断,磁盘重新加载数据到内存。每隔4k写入,用于保证不会发生缺页。
二.查找MappedFile
findMappedFileByOffset 根据偏移量查找文件
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {try {// 获取首个文件MappedFile firstMappedFile = this.getFirstMappedFile();//获取最后一个文件MappedFile lastMappedFile = this.getLastMappedFile();if (firstMappedFile != null && lastMappedFile != null) {// 偏移量小于现有最小的或者大于现有最大的,直接返回if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {} else {//计算文件索引,这里也是考虑到首个文件偏移量不为0的情况int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));MappedFile targetFile = null;try {targetFile = this.mappedFiles.get(index);} catch (Exception ignored) {}// 正好存在if (targetFile != null && offset >= targetFile.getFileFromOffset()&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {return targetFile;}for (MappedFile tmpMappedFile : this.mappedFiles) {if (offset >= tmpMappedFile.getFileFromOffset()&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {return tmpMappedFile;}}}if (returnFirstOnNotFound) {return firstMappedFile;}}} catch (Exception e) { }return null;}
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
