Skip to content

Commit af7d542

Browse files
authored
feat: add RocketMQ MappedFile Detail (doocs#112)
1 parent ffaf2ec commit af7d542

2 files changed

Lines changed: 253 additions & 0 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@
277277
- [RocketMQ 生产者启动流程](docs/rocketmq/rocketmq-producer-start.md)
278278
- [RocketMQ 消息发送流程](docs/rocketmq/rocketmq-send-message.md)
279279
- [RocketMQ 消息发送存储流程](docs/rocketmq/rocketmq-send-store.md)
280+
- [RocketMQ MappedFile内存映射文件详解](docs/rocketmq/rocketmq-mappedfile-detail.md)
280281

281282
## 番外篇(JDK 1.8)
282283

Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
# RocketMQ MappedFile 内存映射文件详解
2+
3+
1、MappedFile 初始化
4+
5+
```java
6+
private void init(final String fileName, final int fileSize) throws IOException {
7+
this.fileName = fileName;
8+
this.fileSize = fileSize;
9+
this.file = new File(fileName);
10+
this.fileFromOffset = Long.parseLong(this.file.getName());
11+
boolean ok = false;
12+
13+
ensureDirOK(this.file.getParent());
14+
15+
try {
16+
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
17+
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
18+
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
19+
TOTAL_MAPPED_FILES.incrementAndGet();
20+
ok = true;
21+
} catch (FileNotFoundException e) {
22+
log.error("Failed to create file " + this.fileName, e);
23+
throw e;
24+
} catch (IOException e) {
25+
log.error("Failed to map file " + this.fileName, e);
26+
throw e;
27+
} finally {
28+
if (!ok && this.fileChannel != null) {
29+
this.fileChannel.close();
30+
}
31+
}
32+
}
33+
```
34+
35+
初始化`fileFromOffset`,因为 commitLog 文件夹下的文件都是以偏移量为命名的,所以转成了 long 类型
36+
37+
确认文件目录是否存在,不存在则创建
38+
39+
```java
40+
public static void ensureDirOK(final String dirName) {
41+
if (dirName != null) {
42+
if (dirName.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) {
43+
String[] dirs = dirName.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
44+
for (String dir : dirs) {
45+
createDirIfNotExist(dir);
46+
}
47+
} else {
48+
createDirIfNotExist(dirName);
49+
}
50+
}
51+
}
52+
```
53+
54+
通过`RandomAccessFile`设置 fileChannel
55+
56+
`this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();`
57+
58+
使用 NIO 内存映射将文件映射到内存中
59+
60+
`this.mappedByteBuffer = this.fileChannel.map(MapMode.*READ_WRITE*, 0, fileSize);`
61+
62+
2、MappedFile 提交
63+
64+
```java
65+
public int commit(final int commitLeastPages) {
66+
if (writeBuffer == null) {
67+
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
68+
return this.wrotePosition.get();
69+
}
70+
if (this.isAbleToCommit(commitLeastPages)) {
71+
if (this.hold()) {
72+
commit0();
73+
this.release();
74+
} else {
75+
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
76+
}
77+
}
78+
79+
// All dirty data has been committed to FileChannel.
80+
if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
81+
this.transientStorePool.returnBuffer(writeBuffer);
82+
this.writeBuffer = null;
83+
}
84+
85+
return this.committedPosition.get();
86+
}
87+
```
88+
89+
如果 wroteBuffer 为空,直接返回 wrotePosition
90+
91+
```java
92+
if (writeBuffer == null) {
93+
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
94+
return this.wrotePosition.get();
95+
}
96+
```
97+
98+
判断是否执行 commit 操作:
99+
100+
如果文件已满,返回 true
101+
102+
```java
103+
if (this.isFull()) {
104+
return true;
105+
}
106+
```
107+
108+
```java
109+
public boolean isFull() {
110+
return this.fileSize == this.wrotePosition.get();
111+
}
112+
```
113+
114+
commitLeastPages 为本次提交的最小页数,如果 commitLeastPages 大于 0,计算当前写指针(`wrotePosition`)与上一次提交的指针`committedPosition`的差值 除以页*`OS_PAGE_SIZE`*的大小得到脏页数量,如果大于 commitLeastPages,就可以提交。如果 commitLeastPages 小于 0,则存在脏页就提交
115+
116+
```java
117+
if (commitLeastPages > 0) {
118+
return ((write /OS_PAGE_SIZE) - (flush /OS_PAGE_SIZE)) >= commitLeastPages;
119+
}
120+
121+
return write > flush;
122+
```
123+
124+
MapperFile 具体的提交过程,首先创建 `writeBuffer`的共享缓存区,设置 position 为上一次提交的位置`committedPosition` ,设置 limit 为`wrotePosition`当前写指针,接着将 committedPosition 到 wrotePosition 的数据写入到 FileChannel 中,最后更新 committedPosition 指针为 wrotePosition
125+
126+
```java
127+
protected void commit0() {
128+
int writePos = this.wrotePosition.get();
129+
int lastCommittedPosition = this.committedPosition.get();
130+
131+
if (writePos - lastCommittedPosition > 0) {
132+
try {
133+
ByteBuffer byteBuffer = writeBuffer.slice();
134+
byteBuffer.position(lastCommittedPosition);
135+
byteBuffer.limit(writePos);
136+
this.fileChannel.position(lastCommittedPosition);
137+
this.fileChannel.write(byteBuffer);
138+
this.committedPosition.set(writePos);
139+
} catch (Throwable e) {
140+
log.error("Error occurred when commit data to FileChannel.", e);
141+
}
142+
}
143+
}
144+
```
145+
146+
3、MappedFile 刷盘
147+
148+
判断是否要进行刷盘
149+
150+
文件是否已满
151+
152+
```java
153+
if (this.isFull()) {
154+
return true;
155+
}
156+
```
157+
158+
```java
159+
public boolean isFull() {
160+
return this.fileSize == this.wrotePosition.get();
161+
}
162+
```
163+
164+
如果`flushLeastPages`大于 0,判断写数据指针位置-上次刷盘的指针位置, 然后除以*`OS_PAGE_SIZE 是否大于等于`*`flushLeastPages`
165+
166+
如果 flushLeastPages 小于等于 0,判断是否有要刷盘的数据
167+
168+
```java
169+
if (flushLeastPages > 0) {
170+
return ((write /OS_PAGE_SIZE) - (flush /OS_PAGE_SIZE)) >= flushLeastPages;
171+
}
172+
173+
return write > flush;
174+
```
175+
176+
获取最大读指针
177+
178+
```java
179+
public int getReadPosition() {
180+
return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
181+
}
182+
```
183+
184+
将数据刷出到磁盘
185+
186+
如果`writeBuffer`不为空或者通道的 position 不等于 0,通过 fileChannel 将数据刷新到磁盘
187+
188+
否则通过 MappedByteBuffer 将数据刷新到磁盘
189+
190+
4、MappedFile 销毁
191+
192+
```java
193+
public boolean destroy(final long intervalForcibly) {
194+
this.shutdown(intervalForcibly);
195+
196+
if (this.isCleanupOver()) {
197+
try {
198+
this.fileChannel.close();
199+
log.info("close file channel " + this.fileName + " OK");
200+
201+
long beginTime = System.currentTimeMillis();
202+
boolean result = this.file.delete();
203+
log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
204+
+ (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
205+
+ this.getFlushedPosition() + ", "
206+
+ UtilAll.computeElapsedTimeMilliseconds(beginTime));
207+
} catch (Exception e) {
208+
log.warn("close file channel " + this.fileName + " Failed. ", e);
209+
}
210+
211+
return true;
212+
} else {
213+
log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
214+
+ " Failed. cleanupOver: " + this.cleanupOver);
215+
}
216+
217+
return false;
218+
}
219+
```
220+
221+
1> 关闭 MappedFile
222+
223+
第一次调用时 this.`available为true`,设置 available 为 false,设置第一次关闭的时间戳为当前时间戳,调用 release()释放资源,只有在引用次数小于 1 的时候才会释放资源,如果引用次数大于 0,判断当前时间与 firstShutdownTimestamp 的差值是否大于最大拒绝存活期`intervalForcibly`,如果大于等于最大拒绝存活期,将引用数减少 1000,直到引用数小于 0 释放资源
224+
225+
```java
226+
public void shutdown(final long intervalForcibly) {
227+
if (this.available) {
228+
this.available = false;
229+
this.firstShutdownTimestamp = System.currentTimeMillis();
230+
this.release();
231+
} else if (this.getRefCount() > 0) {
232+
if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
233+
this.refCount.set(-1000 - this.getRefCount());
234+
this.release();
235+
}
236+
}
237+
}
238+
```
239+
240+
2> 判断是否清理完成
241+
242+
是否清理完成的标准是引用次数小于等于 0 并且清理完成标记 cleanupOver 为 true
243+
244+
```java
245+
public boolean isCleanupOver() {
246+
return this.refCount.get() <= 0 && this.cleanupOver;
247+
}
248+
```
249+
250+
3> 关闭文件通道 fileChannel
251+
252+
`this.fileChannel.close();`

0 commit comments

Comments
 (0)