|
| 1 | +# RocketMQ MappedFile 内存映射文件详解 |
| 2 | + |
| 3 | +1、MappedFile 初始化 |
| 4 | + |
| 5 | +```java |
| 6 | +private void init(final String fileName, final int fileSize) throws IOException { |
| 7 | + this.fileName = fileName; |
| 8 | + this.fileSize = fileSize; |
| 9 | + this.file = new File(fileName); |
| 10 | + this.fileFromOffset = Long.parseLong(this.file.getName()); |
| 11 | + boolean ok = false; |
| 12 | + |
| 13 | + ensureDirOK(this.file.getParent()); |
| 14 | + |
| 15 | + try { |
| 16 | + this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); |
| 17 | + this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); |
| 18 | + TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize); |
| 19 | + TOTAL_MAPPED_FILES.incrementAndGet(); |
| 20 | + ok = true; |
| 21 | + } catch (FileNotFoundException e) { |
| 22 | + log.error("Failed to create file " + this.fileName, e); |
| 23 | + throw e; |
| 24 | + } catch (IOException e) { |
| 25 | + log.error("Failed to map file " + this.fileName, e); |
| 26 | + throw e; |
| 27 | + } finally { |
| 28 | + if (!ok && this.fileChannel != null) { |
| 29 | + this.fileChannel.close(); |
| 30 | + } |
| 31 | + } |
| 32 | +} |
| 33 | +``` |
| 34 | + |
| 35 | +初始化`fileFromOffset`,因为 commitLog 文件夹下的文件都是以偏移量为命名的,所以转成了 long 类型 |
| 36 | + |
| 37 | +确认文件目录是否存在,不存在则创建 |
| 38 | + |
| 39 | +```java |
| 40 | +public static void ensureDirOK(final String dirName) { |
| 41 | + if (dirName != null) { |
| 42 | + if (dirName.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { |
| 43 | + String[] dirs = dirName.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); |
| 44 | + for (String dir : dirs) { |
| 45 | + createDirIfNotExist(dir); |
| 46 | + } |
| 47 | + } else { |
| 48 | + createDirIfNotExist(dirName); |
| 49 | + } |
| 50 | + } |
| 51 | +} |
| 52 | +``` |
| 53 | + |
| 54 | +通过`RandomAccessFile`设置 fileChannel |
| 55 | + |
| 56 | +`this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();` |
| 57 | + |
| 58 | +使用 NIO 内存映射将文件映射到内存中 |
| 59 | + |
| 60 | +`this.mappedByteBuffer = this.fileChannel.map(MapMode.*READ_WRITE*, 0, fileSize);` |
| 61 | + |
| 62 | +2、MappedFile 提交 |
| 63 | + |
| 64 | +```java |
| 65 | +public int commit(final int commitLeastPages) { |
| 66 | + if (writeBuffer == null) { |
| 67 | + //no need to commit data to file channel, so just regard wrotePosition as committedPosition. |
| 68 | + return this.wrotePosition.get(); |
| 69 | + } |
| 70 | + if (this.isAbleToCommit(commitLeastPages)) { |
| 71 | + if (this.hold()) { |
| 72 | + commit0(); |
| 73 | + this.release(); |
| 74 | + } else { |
| 75 | + log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); |
| 76 | + } |
| 77 | + } |
| 78 | + |
| 79 | + // All dirty data has been committed to FileChannel. |
| 80 | + if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) { |
| 81 | + this.transientStorePool.returnBuffer(writeBuffer); |
| 82 | + this.writeBuffer = null; |
| 83 | + } |
| 84 | + |
| 85 | + return this.committedPosition.get(); |
| 86 | +} |
| 87 | +``` |
| 88 | + |
| 89 | +如果 wroteBuffer 为空,直接返回 wrotePosition |
| 90 | + |
| 91 | +```java |
| 92 | +if (writeBuffer == null) { |
| 93 | + //no need to commit data to file channel, so just regard wrotePosition as committedPosition. |
| 94 | + return this.wrotePosition.get(); |
| 95 | +} |
| 96 | +``` |
| 97 | + |
| 98 | +判断是否执行 commit 操作: |
| 99 | + |
| 100 | +如果文件已满,返回 true |
| 101 | + |
| 102 | +```java |
| 103 | +if (this.isFull()) { |
| 104 | + return true; |
| 105 | +} |
| 106 | +``` |
| 107 | + |
| 108 | +```java |
| 109 | +public boolean isFull() { |
| 110 | + return this.fileSize == this.wrotePosition.get(); |
| 111 | +} |
| 112 | +``` |
| 113 | + |
| 114 | +commitLeastPages 为本次提交的最小页数,如果 commitLeastPages 大于 0,计算当前写指针(`wrotePosition`)与上一次提交的指针`committedPosition`的差值 除以页*`OS_PAGE_SIZE`*的大小得到脏页数量,如果大于 commitLeastPages,就可以提交。如果 commitLeastPages 小于 0,则存在脏页就提交 |
| 115 | + |
| 116 | +```java |
| 117 | +if (commitLeastPages > 0) { |
| 118 | + return ((write /OS_PAGE_SIZE) - (flush /OS_PAGE_SIZE)) >= commitLeastPages; |
| 119 | +} |
| 120 | + |
| 121 | +return write > flush; |
| 122 | +``` |
| 123 | + |
| 124 | +MapperFile 具体的提交过程,首先创建 `writeBuffer`的共享缓存区,设置 position 为上一次提交的位置`committedPosition` ,设置 limit 为`wrotePosition`当前写指针,接着将 committedPosition 到 wrotePosition 的数据写入到 FileChannel 中,最后更新 committedPosition 指针为 wrotePosition |
| 125 | + |
| 126 | +```java |
| 127 | +protected void commit0() { |
| 128 | + int writePos = this.wrotePosition.get(); |
| 129 | + int lastCommittedPosition = this.committedPosition.get(); |
| 130 | + |
| 131 | + if (writePos - lastCommittedPosition > 0) { |
| 132 | + try { |
| 133 | + ByteBuffer byteBuffer = writeBuffer.slice(); |
| 134 | + byteBuffer.position(lastCommittedPosition); |
| 135 | + byteBuffer.limit(writePos); |
| 136 | + this.fileChannel.position(lastCommittedPosition); |
| 137 | + this.fileChannel.write(byteBuffer); |
| 138 | + this.committedPosition.set(writePos); |
| 139 | + } catch (Throwable e) { |
| 140 | + log.error("Error occurred when commit data to FileChannel.", e); |
| 141 | + } |
| 142 | + } |
| 143 | +} |
| 144 | +``` |
| 145 | + |
| 146 | +3、MappedFile 刷盘 |
| 147 | + |
| 148 | +判断是否要进行刷盘 |
| 149 | + |
| 150 | +文件是否已满 |
| 151 | + |
| 152 | +```java |
| 153 | +if (this.isFull()) { |
| 154 | + return true; |
| 155 | +} |
| 156 | +``` |
| 157 | + |
| 158 | +```java |
| 159 | +public boolean isFull() { |
| 160 | + return this.fileSize == this.wrotePosition.get(); |
| 161 | +} |
| 162 | +``` |
| 163 | + |
| 164 | +如果`flushLeastPages`大于 0,判断写数据指针位置-上次刷盘的指针位置, 然后除以*`OS_PAGE_SIZE 是否大于等于`*`flushLeastPages` |
| 165 | + |
| 166 | +如果 flushLeastPages 小于等于 0,判断是否有要刷盘的数据 |
| 167 | + |
| 168 | +```java |
| 169 | +if (flushLeastPages > 0) { |
| 170 | + return ((write /OS_PAGE_SIZE) - (flush /OS_PAGE_SIZE)) >= flushLeastPages; |
| 171 | +} |
| 172 | + |
| 173 | +return write > flush; |
| 174 | +``` |
| 175 | + |
| 176 | +获取最大读指针 |
| 177 | + |
| 178 | +```java |
| 179 | +public int getReadPosition() { |
| 180 | + return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get(); |
| 181 | +} |
| 182 | +``` |
| 183 | + |
| 184 | +将数据刷出到磁盘 |
| 185 | + |
| 186 | +如果`writeBuffer`不为空或者通道的 position 不等于 0,通过 fileChannel 将数据刷新到磁盘 |
| 187 | + |
| 188 | +否则通过 MappedByteBuffer 将数据刷新到磁盘 |
| 189 | + |
| 190 | +4、MappedFile 销毁 |
| 191 | + |
| 192 | +```java |
| 193 | +public boolean destroy(final long intervalForcibly) { |
| 194 | + this.shutdown(intervalForcibly); |
| 195 | + |
| 196 | + if (this.isCleanupOver()) { |
| 197 | + try { |
| 198 | + this.fileChannel.close(); |
| 199 | + log.info("close file channel " + this.fileName + " OK"); |
| 200 | + |
| 201 | + long beginTime = System.currentTimeMillis(); |
| 202 | + boolean result = this.file.delete(); |
| 203 | + log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName |
| 204 | + + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:" |
| 205 | + + this.getFlushedPosition() + ", " |
| 206 | + + UtilAll.computeElapsedTimeMilliseconds(beginTime)); |
| 207 | + } catch (Exception e) { |
| 208 | + log.warn("close file channel " + this.fileName + " Failed. ", e); |
| 209 | + } |
| 210 | + |
| 211 | + return true; |
| 212 | + } else { |
| 213 | + log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName |
| 214 | + + " Failed. cleanupOver: " + this.cleanupOver); |
| 215 | + } |
| 216 | + |
| 217 | + return false; |
| 218 | +} |
| 219 | +``` |
| 220 | + |
| 221 | +1> 关闭 MappedFile |
| 222 | + |
| 223 | +第一次调用时 this.`available为true`,设置 available 为 false,设置第一次关闭的时间戳为当前时间戳,调用 release()释放资源,只有在引用次数小于 1 的时候才会释放资源,如果引用次数大于 0,判断当前时间与 firstShutdownTimestamp 的差值是否大于最大拒绝存活期`intervalForcibly`,如果大于等于最大拒绝存活期,将引用数减少 1000,直到引用数小于 0 释放资源 |
| 224 | + |
| 225 | +```java |
| 226 | +public void shutdown(final long intervalForcibly) { |
| 227 | + if (this.available) { |
| 228 | + this.available = false; |
| 229 | + this.firstShutdownTimestamp = System.currentTimeMillis(); |
| 230 | + this.release(); |
| 231 | + } else if (this.getRefCount() > 0) { |
| 232 | + if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) { |
| 233 | + this.refCount.set(-1000 - this.getRefCount()); |
| 234 | + this.release(); |
| 235 | + } |
| 236 | + } |
| 237 | +} |
| 238 | +``` |
| 239 | + |
| 240 | +2> 判断是否清理完成 |
| 241 | + |
| 242 | +是否清理完成的标准是引用次数小于等于 0 并且清理完成标记 cleanupOver 为 true |
| 243 | + |
| 244 | +```java |
| 245 | +public boolean isCleanupOver() { |
| 246 | + return this.refCount.get() <= 0 && this.cleanupOver; |
| 247 | +} |
| 248 | +``` |
| 249 | + |
| 250 | +3> 关闭文件通道 fileChannel |
| 251 | + |
| 252 | +`this.fileChannel.close();` |
0 commit comments