RocketMQ存储之MappedFile
文章目录
- 一、概述
- 二、实现细节
- 变量
- 初始化
- 数据write
- 数据commit
- 数据flush
- 数据读取
- 销毁文件
- 内存文件映射预热
- 三、总结
一、概述
RocketMQ的存储依赖于三类文件:CommitLog、ConsumeQueue、IndexFile,每类文件的具体作用可以参考官方文档,这里我们只做源码分析。三类存储文件都依赖MappedFile 来实现文件底层的数据读写。从类名可以看出:MappedFile 是通过内存映射文件来实现文件操作的,本篇我们分析MappedFile 的实现细节(version 4.8.0)。
二、实现细节
变量
我们先来看一下MappedFile 的变量:
/*** 操作系统内存页大小*/public static final int OS_PAGE_SIZE = 1024 * 4;protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);/*** 写位置,文件写到buffer的position*/protected final AtomicInteger wrotePosition = new AtomicInteger(0);/*** 提交位置,文件提交到文件映射的内存的位置*/protected final AtomicInteger committedPosition = new AtomicInteger(0);/*** flush位置,文件刷盘的位置*/private final AtomicInteger flushedPosition = new AtomicInteger(0);protected int fileSize;protected FileChannel fileChannel;/*** Message will put to here first, and then reput to FileChannel if writeBuffer is not null.* 如果writeBuffer不是空的话,消息会先写到临时的writeBuffer中,随后才会被写到fileChannel中。*/protected ByteBuffer writeBuffer = null;/*** 临时存储池,即内存池*/protected TransientStorePool transientStorePool = null;private String fileName;private long fileFromOffset;private File file;/*** 文件内存映射的buffer*/private MappedByteBuffer mappedByteBuffer;private volatile long storeTimestamp = 0;private boolean firstCreateInQueue = false;
初始化
MappedFile的构造方法:
public MappedFile() {}public MappedFile(final String fileName, final int fileSize) throws IOException {init(fileName, fileSize);}public MappedFile(final String fileName, final int fileSize,final TransientStorePool transientStorePool) throws IOException {init(fileName, fileSize, transientStorePool);}
两个有参构造方法需要需要调用init的两个重载方法来完成初始化。
init方法实现逻辑:
public void init(final String fileName, final int fileSize,final TransientStorePool transientStorePool) throws IOException {init(fileName, fileSize);// 只有方法传入了transientStorePool存储池,成员变量writeBuffer才会被赋值this.writeBuffer = transientStorePool.borrowBuffer();this.transientStorePool = transientStorePool;}private void init(final String fileName, final int fileSize) throws IOException {this.fileName = fileName;this.fileSize = fileSize;this.file = new File(fileName);// 映射文件的第一个字节在组文件中的相对位置this.fileFromOffset = Long.parseLong(this.file.getName());boolean ok = false;ensureDirOK(this.file.getParent());try {// 获取文件的channelthis.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();// 映射文件this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);TOTAL_MAPPED_FILES.incrementAndGet();ok = true;} catch (FileNotFoundException e) {log.error("Failed to create file " + this.fileName, e);throw e;} catch (IOException e) {log.error("Failed to map file " + this.fileName, e);throw e;} finally {if (!ok && this.fileChannel != null) {this.fileChannel.close();}}}
从两个重载的init方法中,可以得出以下两个结论:
1、只有在init方法传入transientStorePool 时,才会给writeBuffer赋值。
2、映射的文件名称必须为数字,并且文件名会表示该文件的第一个字节在文件组内的位置。
数据write
数据写入分了两种情况,第一种是写入MessageExt类型的对象消息,一种是写入byte[]二进制数据。
我们选择一个重载的方法来看一下写入二进制数据的处理逻辑:
public boolean appendMessage(final byte[] data, final int offset, final int length) {// 当前已经写入的位置int currentPos = this.wrotePosition.get();// 判断当前文件是否足够写入待写入的数据if ((currentPos + length) <= this.fileSize) {try {// 直接通过fileChannel写入数据this.fileChannel.position(currentPos);this.fileChannel.write(ByteBuffer.wrap(data, offset, length));} catch (Throwable e) {log.error("Error occurred when append message to mappedFile.", e);}this.wrotePosition.addAndGet(length);return true;}return false;}
我们可以看到,写二进制byte[]的逻辑简单暴力,如果文件剩余量大于待写入数据量,就直接通过fileChannel写入,否则认为写入失败。
写入MessageExt类型的对象消息处理逻辑:
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {assert messageExt != null;assert cb != null;// 当前已经写入的位置int currentPos = this.wrotePosition.get();// 当前文件没写满才可以写if (currentPos < this.fileSize) {// 写入对象选择,当writeBuffer不是空的时候写入到writeBuffer,当writeBuffer时空的时候,直接写入到mappedByteBuffer文件内存映射ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();byteBuffer.position(currentPos);AppendMessageResult result;// 使用AppendMessageCallback来执行真正的写入逻辑if (messageExt instanceof MessageExtBrokerInner) {result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);} else if (messageExt instanceof MessageExtBatch) {result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);} else {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}// 更新wrotePosition的值this.wrotePosition.addAndGet(result.getWroteBytes());// 更新最新的消息存储时间this.storeTimestamp = result.getStoreTimestamp();return result;}log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}
我们可以看到,写对象消息和写byte[]二进制消息不同,只要当前文件没有满,就可以写入。优先写入writeBuffer暂存缓冲,如果writeBuffer不存在,就直接写入mappedByteBuffer文件内存映射。真正的序列化及写入逻辑在AppendMessageCallback回调中实现。具体内容我们本篇暂时不看。
数据commit
数据提交就是将暂存区writeBuffer中的数据提交到文件中,我们看一下具体实现:
public int commit(final int commitLeastPages) {//如果writeBuffer为空,写入的数据会被直接写入mappedByteBuffer文件内存映射,不需要提交数据if (writeBuffer == null) {//no need to commit data to file channel, so just regard wrotePosition as committedPosition.return this.wrotePosition.get();}// 判断是否达到提交条件(每次提交最少提交commitLeastPages个页)if (this.isAbleToCommit(commitLeastPages)) {if (this.hold()) {commit0(commitLeastPages);this.release();} else {log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());}}// All dirty data has been committed to FileChannel.// 文件被写满了if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {this.transientStorePool.returnBuffer(writeBuffer);this.writeBuffer = null;}return this.committedPosition.get();}protected void commit0(final int commitLeastPages) {int writePos = this.wrotePosition.get();int lastCommittedPosition = this.committedPosition.get();// 笔者认为这个表达式是错误的,应该是writePos - lastCommittedPosition > 0,用来判断是否有未提交的数据if (writePos - lastCommittedPosition > commitLeastPages) {try {ByteBuffer byteBuffer = writeBuffer.slice();byteBuffer.position(lastCommittedPosition);byteBuffer.limit(writePos);this.fileChannel.position(lastCommittedPosition);this.fileChannel.write(byteBuffer);this.committedPosition.set(writePos);} catch (Throwable e) {log.error("Error occurred when commit data to FileChannel.", e);}}}protected boolean isAbleToCommit(final int commitLeastPages) {int flush = this.committedPosition.get();int write = this.wrotePosition.get();// 文件被写满了就可以提交if (this.isFull()) {return true;}// 待提交的数据大于commitLeastPages个页,可以提交if (commitLeastPages > 0) {return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;}return write > flush;}
首先,需要判断writeBuffer暂存缓冲是否存在,如果不存在,则不需要提交,因为数据都是被直接写到文件里的。
然后判断待提交的数据量是否达到commitLeastPages个页,如果没有达到则暂时不提交。
最后提交放在了commit0中,使用fileChannel进行数据写入。
数据flush
数据flush和数据的commit类似:
public int flush(final int flushLeastPages) {if (this.isAbleToFlush(flushLeastPages)) {if (this.hold()) {int value = getReadPosition();try {//We only append data to fileChannel or mappedByteBuffer, never both.if (writeBuffer != null || this.fileChannel.position() != 0) {// writeBuffer 不是空的时候,commit是通过fileChannel提交的,所以要通过fileChannel刷盘this.fileChannel.force(false);} else {// writeBuffer 为空的时候,数据直接写到了mappedByteBuffer中,所以要通过mappedByteBuffer刷盘this.mappedByteBuffer.force();}} catch (Throwable e) {log.error("Error occurred when force data to disk.", e);}this.flushedPosition.set(value);this.release();} else {log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());this.flushedPosition.set(getReadPosition());}}return this.getFlushedPosition();}/*** @return The max position which have valid data*/public int getReadPosition() {return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();}private boolean isAbleToFlush(final int flushLeastPages) {int flush = this.flushedPosition.get();int write = getReadPosition();if (this.isFull()) {// 文件写满了,可以flush落盘return true;}if (flushLeastPages > 0) {return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;}return write > flush;}
flush数据需要先判断文件是否已经被写满、或者未flush的数据是否达到flushLeastPages个页。到达上述条件才可以flush。
数据读取
数据读取逻辑也非常简单,数据读取提供了两个重载的方法selectMappedBuffer,这里我们拿限量读取方法来分析:
public SelectMappedBufferResult selectMappedBuffer(int pos, int size) {// readPosition表示已经commit的数据position。int readPosition = getReadPosition();if ((pos + size) <= readPosition) {// 没有发生读越界if (this.hold()) {ByteBuffer byteBuffer = this.mappedByteBuffer.slice();byteBuffer.position(pos);ByteBuffer byteBufferNew = byteBuffer.slice();byteBufferNew.limit(size);return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);} else {log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: "+ this.fileFromOffset);}} else {log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size+ ", fileFromOffset: " + this.fileFromOffset);}return null;}
数据可读的范围是已经commit的数据(被写到writeBuffer暂存缓冲的数据是展示读不到的),如果需要读取的数据没有超出这个范围,则允许读取。
销毁文件
文件销毁过程主要通cleanup方法和destroy方法完成:
@Overridepublic boolean cleanup(final long currentRef) {if (this.isAvailable()) {log.error("this file[REF:" + currentRef + "] " + this.fileName+ " have not shutdown, stop unmapping.");return false;}if (this.isCleanupOver()) {log.error("this file[REF:" + currentRef + "] " + this.fileName+ " have cleanup, do not do it again.");return true;}clean(this.mappedByteBuffer);TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(this.fileSize * (-1));TOTAL_MAPPED_FILES.decrementAndGet();log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " OK");return true;}public static void clean(final ByteBuffer buffer) {if (buffer == null || !buffer.isDirect() || buffer.capacity() == 0)return;invoke(invoke(viewed(buffer), "cleaner"), "clean");}public boolean destroy(final long intervalForcibly) {// shutdown方法会调用cleanup方法this.shutdown(intervalForcibly);if (this.isCleanupOver()) {try {this.fileChannel.close();log.info("close file channel " + this.fileName + " OK");long beginTime = System.currentTimeMillis();// 删除文件boolean result = this.file.delete();log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName+ (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"+ this.getFlushedPosition() + ", "+ UtilAll.computeElapsedTimeMilliseconds(beginTime));} catch (Exception e) {log.warn("close file channel " + this.fileName + " Failed. ", e);}return true;} else {log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName+ " Failed. cleanupOver: " + this.cleanupOver);}return false;}
我们看到,文件映射的销毁,需要调用cleanup方法,cleanup方法主要做的是销毁mappedByteBuffer,销毁过程在clean方法中:clean方法只用来销毁direct类型的buffer(直接内存缓冲),销毁是调用自身buffer的cleaner的clean方法。
销毁mappedByteBuffer后,destroy方法会关闭fileChannel,然后删除文件。
内存文件映射预热
MappedFile提供了预热功能:
public void warmMappedFile(FlushDiskType type, int pages) {long beginTime = System.currentTimeMillis();ByteBuffer byteBuffer = this.mappedByteBuffer.slice();int flush = 0;long time = System.currentTimeMillis();for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {// 将文件还没有被写入数据的页写入数据,为了触发缺页中断,将缺页加载到虚拟内存中byteBuffer.put(i, (byte) 0);// force flush when flush disk type is syncif (type == FlushDiskType.SYNC_FLUSH) {if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {flush = i;mappedByteBuffer.force();}}// prevent gc// 防止在抢占式CPU竞争策略下,多次循环一直占用CPU。sleep(0)时触发新的CPU资源竞争if (j % 1000 == 0) {log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);time = System.currentTimeMillis();try {Thread.sleep(0);} catch (InterruptedException e) {log.error("Interrupted", e);}}}// force flush when prepare load finishedif (type == FlushDiskType.SYNC_FLUSH) {log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",this.getFileName(), System.currentTimeMillis() - beginTime);mappedByteBuffer.force();}log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),System.currentTimeMillis() - beginTime);// 强制设置mappedByteBuffer内存区域不被swap交换到磁盘this.mlock();}
预热逻辑就是:将文件还没有被写入的部分按每一个操作系统内存页上写入一位数据,如果虚拟内存页没有映射到待映射的文件,则会触发缺页中断,将虚拟内存与文件进行映射。为了防止映射过程中占用CPU时间过长,每执行1000次映射,会通过Thread.sleep(0)重新触发CPU的分配。
三、总结
从上面的分析我们可以得出结论:MappedFile提供了内存文件映射的功能。提供了文件读、顺序写、预热的功能。
其中比较重要的是三个position指针:
wrotePosition:写指针,即数据被写入MappedFile的最新指针,可能存在暂存缓存中,没有被提交到文件映射
committedPosition:提交指针,即数据被写入文件的最新指针(只是写入到文件映射,不一定被刷盘)
flushedPosition:刷盘指针,即数据被刷盘的最新位置
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
