Skip to content

Commit b897d6d

Browse files
authored
feat: add commitLog detail (doocs#114)
* add CommitLog detail
1 parent 2fa89cb commit b897d6d

2 files changed

Lines changed: 271 additions & 0 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@
279279
- [RocketMQ 消息发送存储流程](docs/rocketmq/rocketmq-send-store.md)
280280
- [RocketMQ MappedFile内存映射文件详解](docs/rocketmq/rocketmq-mappedfile-detail.md)
281281
- [RocketMQ ConsumeQueue详解](docs/rocketmq/rocketmq-consumequeue.md)
282+
- [RocketMQ CommitLog详解](docs/rocketmq/rocketmq-commitlog.md)
282283

283284
## 番外篇(JDK 1.8)
284285

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
# RocketMQ CommitLog 详解
2+
3+
commitlog 目录主要存储消息,为了保证性能,顺序写入,每一条消息的长度都不相同,每条消息的前面四个字节存储该条消息的总长度,每个文件大小默认为 1G,文件的命名是以 commitLog 起始偏移量命名的,可以通过修改 broker 配置文件中 mappedFileSizeCommitLog 属性改变文件大小
4+
5+
1、获取最小偏移量
6+
7+
org.apache.rocketmq.store.CommitLog#getMinOffset
8+
9+
```java
10+
public long getMinOffset() {
11+
MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
12+
if (mappedFile != null) {
13+
if (mappedFile.isAvailable()) {
14+
return mappedFile.getFileFromOffset();
15+
} else {
16+
return this.rollNextFile(mappedFile.getFileFromOffset());
17+
}
18+
}
19+
20+
return -1;
21+
}
22+
```
23+
24+
获取目录下的第一个文件
25+
26+
```java
27+
public MappedFile getFirstMappedFile() {
28+
MappedFile mappedFileFirst = null;
29+
30+
if (!this.mappedFiles.isEmpty()) {
31+
try {
32+
mappedFileFirst = this.mappedFiles.get(0);
33+
} catch (IndexOutOfBoundsException e) {
34+
//ignore
35+
} catch (Exception e) {
36+
log.error("getFirstMappedFile has exception.", e);
37+
}
38+
}
39+
40+
return mappedFileFirst;
41+
}
42+
```
43+
44+
如果该文件可用返回文件的起始偏移量,否则返回下一个文件的 起始偏移量
45+
46+
```java
47+
public long rollNextFile(final long offset) {
48+
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
49+
return offset + mappedFileSize - offset % mappedFileSize;
50+
}
51+
```
52+
53+
2、根据偏移量和消息长度查找消息
54+
55+
org.apache.rocketmq.store.CommitLog#getMessage
56+
57+
```java
58+
public SelectMappedBufferResult getMessage(final long offset, final int size) {
59+
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
60+
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
61+
if (mappedFile != null) {
62+
int pos = (int) (offset % mappedFileSize);
63+
return mappedFile.selectMappedBuffer(pos, size);
64+
}
65+
return null;
66+
}
67+
```
68+
69+
首先获取 commitLog 文件大小,默认 1G
70+
71+
`private int mappedFileSizeCommitLog = 1024 * 1024 * 1024;`
72+
73+
获取偏移量所在的 MappedFile
74+
75+
org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)
76+
77+
获取第一个 MappedFile 和最后一个 MappedFile,校验偏移量是否在这两个 MappedFile 之间,计算当前偏移量所在 MappedFiles 索引值为当前偏移量的索引减去第一个文件的索引值
78+
79+
```java
80+
if (firstMappedFile != null && lastMappedFile != null) {
81+
if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
82+
LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
83+
offset,
84+
firstMappedFile.getFileFromOffset(),
85+
lastMappedFile.getFileFromOffset() + this.mappedFileSize,
86+
this.mappedFileSize,
87+
this.mappedFiles.size());
88+
} else {
89+
int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
90+
MappedFile targetFile = null;
91+
try {
92+
targetFile = this.mappedFiles.get(index);
93+
} catch (Exception ignored) {
94+
}
95+
96+
if (targetFile != null && offset >= targetFile.getFileFromOffset()
97+
&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
98+
return targetFile;
99+
}
100+
101+
for (MappedFile tmpMappedFile : this.mappedFiles) {
102+
if (offset >= tmpMappedFile.getFileFromOffset()
103+
&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
104+
return tmpMappedFile;
105+
}
106+
}
107+
}
108+
109+
if (returnFirstOnNotFound) {
110+
return firstMappedFile;
111+
}
112+
}
113+
```
114+
115+
根据在文件内的偏移量和消息长度获取消息内容
116+
117+
```java
118+
public SelectMappedBufferResult selectMappedBuffer(int pos, int size) {
119+
int readPosition = getReadPosition();
120+
if ((pos + size) <= readPosition) {
121+
if (this.hold()) {
122+
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
123+
byteBuffer.position(pos);
124+
ByteBuffer byteBufferNew = byteBuffer.slice();
125+
byteBufferNew.limit(size);
126+
return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
127+
} else {
128+
log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: "
129+
+ this.fileFromOffset);
130+
}
131+
} else {
132+
log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size
133+
+ ", fileFromOffset: " + this.fileFromOffset);
134+
}
135+
136+
return null;
137+
}
138+
```
139+
140+
3、Broker 正常停止文件恢复
141+
142+
org.apache.rocketmq.store.CommitLog#recoverNormally
143+
144+
首先查询消息是否验证 CRC
145+
146+
`boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();`
147+
148+
从倒数第 3 个文件开始恢复,如果不足 3 个文件,则从第一个文件开始恢复
149+
150+
```java
151+
int index = mappedFiles.size() - 3;
152+
if (index < 0)
153+
index = 0;
154+
```
155+
156+
循环遍历 CommitLog 文件,每次取出一条消息
157+
158+
`DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);`
159+
160+
如果查找结果为 true 并且消息的长度大于 0,表示消息正确,mappedFileOffset 指针向前移动本条消息的长度;
161+
162+
```java
163+
if (dispatchRequest.isSuccess() && size > 0) {
164+
mappedFileOffset += size;
165+
}
166+
```
167+
168+
如果查找结果为 true 并且结果等于 0,表示已到文件 的末尾,如果还有下一个文件,则重置 processOffset、mappedOffset 并重复上述步骤,否则跳出循环;
169+
170+
```java
171+
else if (dispatchRequest.isSuccess() && size == 0) {
172+
index++;
173+
if (index >= mappedFiles.size()) {
174+
// Current branch can not happen
175+
log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
176+
break;
177+
} else {
178+
mappedFile = mappedFiles.get(index);
179+
byteBuffer = mappedFile.sliceByteBuffer();
180+
processOffset = mappedFile.getFileFromOffset();
181+
mappedFileOffset = 0;
182+
log.info("recover next physics file, " + mappedFile.getFileName());
183+
}
184+
}
185+
```
186+
187+
如果查找结果为 false,则表示消息没有填满该文件,跳出循环,结束遍历
188+
189+
```java
190+
else if (!dispatchRequest.isSuccess()) {
191+
log.info("recover physics file end, " + mappedFile.getFileName());
192+
break;
193+
}
194+
```
195+
196+
更新 committedPosition 和 flushedWhere 指针
197+
198+
```java
199+
this.mappedFileQueue.setFlushedWhere(processOffset);
200+
this.mappedFileQueue.setCommittedWhere(processOffset);
201+
```
202+
203+
删除 offset 之后的所有文件。遍历目录下面的所有文件,如果文件尾部偏移量小于 offset 则跳过该文件,如果尾部的偏移量大于 offset,则进一步比较 offset 与文件的开始偏移量,如果 offset 大于文件的开始偏移量,说明当前文件包含了有效偏移量,设置 MappedFile 的 flushPosition 和 commitedPosition。
204+
205+
如果 offset 小于文件的开始偏移量,说明该文件是有效文件后面创建的,调用 MappedFile#destroy()方法释放资源
206+
207+
```java
208+
if (fileTailOffset > offset) {
209+
if (offset >= file.getFileFromOffset()) {
210+
file.setWrotePosition((int) (offset % this.mappedFileSize));
211+
file.setCommittedPosition((int) (offset % this.mappedFileSize));
212+
file.setFlushedPosition((int) (offset % this.mappedFileSize));
213+
} else {
214+
file.destroy(1000);
215+
willRemoveFiles.add(file);
216+
}
217+
}
218+
```
219+
220+
释放资源需要关闭 MappedFile 和文件通道 fileChannel
221+
222+
```java
223+
public boolean destroy(final long intervalForcibly) {
224+
this.shutdown(intervalForcibly);
225+
226+
if (this.isCleanupOver()) {
227+
try {
228+
this.fileChannel.close();
229+
log.info("close file channel " + this.fileName + " OK");
230+
231+
long beginTime = System.currentTimeMillis();
232+
boolean result = this.file.delete();
233+
log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
234+
+ (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
235+
+ this.getFlushedPosition() + ", "
236+
+ UtilAll.computeElapsedTimeMilliseconds(beginTime));
237+
} catch (Exception e) {
238+
log.warn("close file channel " + this.fileName + " Failed. ", e);
239+
}
240+
241+
return true;
242+
} else {
243+
log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
244+
+ " Failed. cleanupOver: " + this.cleanupOver);
245+
}
246+
247+
return false;
248+
}
249+
```
250+
251+
判断`maxPhyOffsetOfConsumeQueue`是否大于 processOffset,如果大于,需要删除 ConsumeQueue 中 processOffset 之后的数据
252+
253+
```java
254+
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
255+
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
256+
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
257+
}
258+
```
259+
260+
```java
261+
public void truncateDirtyLogicFiles(long phyOffset) {
262+
ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
263+
264+
for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
265+
for (ConsumeQueue logic : maps.values()) {
266+
logic.truncateDirtyLogicFiles(phyOffset);
267+
}
268+
}
269+
}
270+
```

0 commit comments

Comments
 (0)