|
| 1 | +# RocketMQ CommitLog 详解 |
| 2 | + |
| 3 | +commitlog 目录主要存储消息,为了保证性能,顺序写入,每一条消息的长度都不相同,每条消息的前面四个字节存储该条消息的总长度,每个文件大小默认为 1G,文件的命名是以 commitLog 起始偏移量命名的,可以通过修改 broker 配置文件中 mappedFileSizeCommitLog 属性改变文件大小 |
| 4 | + |
| 5 | +1、获取最小偏移量 |
| 6 | + |
| 7 | +org.apache.rocketmq.store.CommitLog#getMinOffset |
| 8 | + |
| 9 | +```java |
| 10 | +public long getMinOffset() { |
| 11 | + MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile(); |
| 12 | + if (mappedFile != null) { |
| 13 | + if (mappedFile.isAvailable()) { |
| 14 | + return mappedFile.getFileFromOffset(); |
| 15 | + } else { |
| 16 | + return this.rollNextFile(mappedFile.getFileFromOffset()); |
| 17 | + } |
| 18 | + } |
| 19 | + |
| 20 | + return -1; |
| 21 | +} |
| 22 | +``` |
| 23 | + |
| 24 | +获取目录下的第一个文件 |
| 25 | + |
| 26 | +```java |
| 27 | +public MappedFile getFirstMappedFile() { |
| 28 | + MappedFile mappedFileFirst = null; |
| 29 | + |
| 30 | + if (!this.mappedFiles.isEmpty()) { |
| 31 | + try { |
| 32 | + mappedFileFirst = this.mappedFiles.get(0); |
| 33 | + } catch (IndexOutOfBoundsException e) { |
| 34 | + //ignore |
| 35 | + } catch (Exception e) { |
| 36 | + log.error("getFirstMappedFile has exception.", e); |
| 37 | + } |
| 38 | + } |
| 39 | + |
| 40 | + return mappedFileFirst; |
| 41 | +} |
| 42 | +``` |
| 43 | + |
| 44 | +如果该文件可用返回文件的起始偏移量,否则返回下一个文件的 起始偏移量 |
| 45 | + |
| 46 | +```java |
| 47 | +public long rollNextFile(final long offset) { |
| 48 | + int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); |
| 49 | + return offset + mappedFileSize - offset % mappedFileSize; |
| 50 | +} |
| 51 | +``` |
| 52 | + |
| 53 | +2、根据偏移量和消息长度查找消息 |
| 54 | + |
| 55 | +org.apache.rocketmq.store.CommitLog#getMessage |
| 56 | + |
| 57 | +```java |
| 58 | +public SelectMappedBufferResult getMessage(final long offset, final int size) { |
| 59 | + int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); |
| 60 | + MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0); |
| 61 | + if (mappedFile != null) { |
| 62 | + int pos = (int) (offset % mappedFileSize); |
| 63 | + return mappedFile.selectMappedBuffer(pos, size); |
| 64 | + } |
| 65 | + return null; |
| 66 | +} |
| 67 | +``` |
| 68 | + |
| 69 | +首先获取 commitLog 文件大小,默认 1G |
| 70 | + |
| 71 | +`private int mappedFileSizeCommitLog = 1024 * 1024 * 1024;` |
| 72 | + |
| 73 | +获取偏移量所在的 MappedFile |
| 74 | + |
| 75 | +org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean) |
| 76 | + |
| 77 | +获取第一个 MappedFile 和最后一个 MappedFile,校验偏移量是否在这两个 MappedFile 之间,计算当前偏移量所在 MappedFiles 索引值为当前偏移量的索引减去第一个文件的索引值 |
| 78 | + |
| 79 | +```java |
| 80 | +if (firstMappedFile != null && lastMappedFile != null) { |
| 81 | + if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) { |
| 82 | + LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}", |
| 83 | + offset, |
| 84 | + firstMappedFile.getFileFromOffset(), |
| 85 | + lastMappedFile.getFileFromOffset() + this.mappedFileSize, |
| 86 | + this.mappedFileSize, |
| 87 | + this.mappedFiles.size()); |
| 88 | + } else { |
| 89 | + int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize)); |
| 90 | + MappedFile targetFile = null; |
| 91 | + try { |
| 92 | + targetFile = this.mappedFiles.get(index); |
| 93 | + } catch (Exception ignored) { |
| 94 | + } |
| 95 | + |
| 96 | + if (targetFile != null && offset >= targetFile.getFileFromOffset() |
| 97 | + && offset < targetFile.getFileFromOffset() + this.mappedFileSize) { |
| 98 | + return targetFile; |
| 99 | + } |
| 100 | + |
| 101 | + for (MappedFile tmpMappedFile : this.mappedFiles) { |
| 102 | + if (offset >= tmpMappedFile.getFileFromOffset() |
| 103 | + && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) { |
| 104 | + return tmpMappedFile; |
| 105 | + } |
| 106 | + } |
| 107 | + } |
| 108 | + |
| 109 | + if (returnFirstOnNotFound) { |
| 110 | + return firstMappedFile; |
| 111 | + } |
| 112 | +} |
| 113 | +``` |
| 114 | + |
| 115 | +根据在文件内的偏移量和消息长度获取消息内容 |
| 116 | + |
| 117 | +```java |
| 118 | +public SelectMappedBufferResult selectMappedBuffer(int pos, int size) { |
| 119 | + int readPosition = getReadPosition(); |
| 120 | + if ((pos + size) <= readPosition) { |
| 121 | + if (this.hold()) { |
| 122 | + ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); |
| 123 | + byteBuffer.position(pos); |
| 124 | + ByteBuffer byteBufferNew = byteBuffer.slice(); |
| 125 | + byteBufferNew.limit(size); |
| 126 | + return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this); |
| 127 | + } else { |
| 128 | + log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: " |
| 129 | + + this.fileFromOffset); |
| 130 | + } |
| 131 | + } else { |
| 132 | + log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size |
| 133 | + + ", fileFromOffset: " + this.fileFromOffset); |
| 134 | + } |
| 135 | + |
| 136 | + return null; |
| 137 | +} |
| 138 | +``` |
| 139 | + |
| 140 | +3、Broker 正常停止文件恢复 |
| 141 | + |
| 142 | +org.apache.rocketmq.store.CommitLog#recoverNormally |
| 143 | + |
| 144 | +首先查询消息是否验证 CRC |
| 145 | + |
| 146 | +`boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();` |
| 147 | + |
| 148 | +从倒数第 3 个文件开始恢复,如果不足 3 个文件,则从第一个文件开始恢复 |
| 149 | + |
| 150 | +```java |
| 151 | +int index = mappedFiles.size() - 3; |
| 152 | +if (index < 0) |
| 153 | + index = 0; |
| 154 | +``` |
| 155 | + |
| 156 | +循环遍历 CommitLog 文件,每次取出一条消息 |
| 157 | + |
| 158 | +`DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);` |
| 159 | + |
| 160 | +如果查找结果为 true 并且消息的长度大于 0,表示消息正确,mappedFileOffset 指针向前移动本条消息的长度; |
| 161 | + |
| 162 | +```java |
| 163 | +if (dispatchRequest.isSuccess() && size > 0) { |
| 164 | + mappedFileOffset += size; |
| 165 | +} |
| 166 | +``` |
| 167 | + |
| 168 | +如果查找结果为 true 并且结果等于 0,表示已到文件 的末尾,如果还有下一个文件,则重置 processOffset、mappedOffset 并重复上述步骤,否则跳出循环; |
| 169 | + |
| 170 | +```java |
| 171 | +else if (dispatchRequest.isSuccess() && size == 0) { |
| 172 | + index++; |
| 173 | + if (index >= mappedFiles.size()) { |
| 174 | + // Current branch can not happen |
| 175 | + log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName()); |
| 176 | + break; |
| 177 | + } else { |
| 178 | + mappedFile = mappedFiles.get(index); |
| 179 | + byteBuffer = mappedFile.sliceByteBuffer(); |
| 180 | + processOffset = mappedFile.getFileFromOffset(); |
| 181 | + mappedFileOffset = 0; |
| 182 | + log.info("recover next physics file, " + mappedFile.getFileName()); |
| 183 | + } |
| 184 | +} |
| 185 | +``` |
| 186 | + |
| 187 | +如果查找结果为 false,则表示消息没有填满该文件,跳出循环,结束遍历 |
| 188 | + |
| 189 | +```java |
| 190 | +else if (!dispatchRequest.isSuccess()) { |
| 191 | + log.info("recover physics file end, " + mappedFile.getFileName()); |
| 192 | + break; |
| 193 | +} |
| 194 | +``` |
| 195 | + |
| 196 | +更新 committedPosition 和 flushedWhere 指针 |
| 197 | + |
| 198 | +```java |
| 199 | +this.mappedFileQueue.setFlushedWhere(processOffset); |
| 200 | +this.mappedFileQueue.setCommittedWhere(processOffset); |
| 201 | +``` |
| 202 | + |
| 203 | +删除 offset 之后的所有文件。遍历目录下面的所有文件,如果文件尾部偏移量小于 offset 则跳过该文件,如果尾部的偏移量大于 offset,则进一步比较 offset 与文件的开始偏移量,如果 offset 大于文件的开始偏移量,说明当前文件包含了有效偏移量,设置 MappedFile 的 flushPosition 和 commitedPosition。 |
| 204 | + |
| 205 | +如果 offset 小于文件的开始偏移量,说明该文件是有效文件后面创建的,调用 MappedFile#destroy()方法释放资源 |
| 206 | + |
| 207 | +```java |
| 208 | +if (fileTailOffset > offset) { |
| 209 | + if (offset >= file.getFileFromOffset()) { |
| 210 | + file.setWrotePosition((int) (offset % this.mappedFileSize)); |
| 211 | + file.setCommittedPosition((int) (offset % this.mappedFileSize)); |
| 212 | + file.setFlushedPosition((int) (offset % this.mappedFileSize)); |
| 213 | + } else { |
| 214 | + file.destroy(1000); |
| 215 | + willRemoveFiles.add(file); |
| 216 | + } |
| 217 | +} |
| 218 | +``` |
| 219 | + |
| 220 | +释放资源需要关闭 MappedFile 和文件通道 fileChannel |
| 221 | + |
| 222 | +```java |
| 223 | +public boolean destroy(final long intervalForcibly) { |
| 224 | + this.shutdown(intervalForcibly); |
| 225 | + |
| 226 | + if (this.isCleanupOver()) { |
| 227 | + try { |
| 228 | + this.fileChannel.close(); |
| 229 | + log.info("close file channel " + this.fileName + " OK"); |
| 230 | + |
| 231 | + long beginTime = System.currentTimeMillis(); |
| 232 | + boolean result = this.file.delete(); |
| 233 | + log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName |
| 234 | + + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:" |
| 235 | + + this.getFlushedPosition() + ", " |
| 236 | + + UtilAll.computeElapsedTimeMilliseconds(beginTime)); |
| 237 | + } catch (Exception e) { |
| 238 | + log.warn("close file channel " + this.fileName + " Failed. ", e); |
| 239 | + } |
| 240 | + |
| 241 | + return true; |
| 242 | + } else { |
| 243 | + log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName |
| 244 | + + " Failed. cleanupOver: " + this.cleanupOver); |
| 245 | + } |
| 246 | + |
| 247 | + return false; |
| 248 | +} |
| 249 | +``` |
| 250 | + |
| 251 | +判断`maxPhyOffsetOfConsumeQueue`是否大于 processOffset,如果大于,需要删除 ConsumeQueue 中 processOffset 之后的数据 |
| 252 | + |
| 253 | +```java |
| 254 | +if (maxPhyOffsetOfConsumeQueue >= processOffset) { |
| 255 | + log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset); |
| 256 | + this.defaultMessageStore.truncateDirtyLogicFiles(processOffset); |
| 257 | +} |
| 258 | +``` |
| 259 | + |
| 260 | +```java |
| 261 | +public void truncateDirtyLogicFiles(long phyOffset) { |
| 262 | + ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; |
| 263 | + |
| 264 | + for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) { |
| 265 | + for (ConsumeQueue logic : maps.values()) { |
| 266 | + logic.truncateDirtyLogicFiles(phyOffset); |
| 267 | + } |
| 268 | + } |
| 269 | +} |
| 270 | +``` |
0 commit comments