Skip to content

Commit 2fa89cb

Browse files
feat: add RocketMQ ConsumeQueue Detail (doocs#113)
* Add sendMessage process * Update rocketmq-send-message.md * Add ConsumeQueue Detail Co-authored-by: Yang Libin <contact@yanglibin.info>
1 parent af7d542 commit 2fa89cb

2 files changed

Lines changed: 198 additions & 0 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@
278278
- [RocketMQ 消息发送流程](docs/rocketmq/rocketmq-send-message.md)
279279
- [RocketMQ 消息发送存储流程](docs/rocketmq/rocketmq-send-store.md)
280280
- [RocketMQ MappedFile内存映射文件详解](docs/rocketmq/rocketmq-mappedfile-detail.md)
281+
- [RocketMQ ConsumeQueue详解](docs/rocketmq/rocketmq-consumequeue.md)
281282

282283
## 番外篇(JDK 1.8)
283284

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
# RocketMQ ConsumeQueue 详解
2+
3+
RocketMQ 基于主题订阅模式实现消息消费,消费者关注每一个主题下的所有消息,但是同一主题下的消息是不连续地存储在 CommitLog 文件中的,如果消费者直接从消息存储文件中遍历查找主题下的消息,效率会特别低。所以为了在查找消息的时候效率更高一些,设计了 ConsumeQueue 文件,可以看作 CommitLog 消费的目录文件.
4+
5+
ConsumeQueue 的第一级目录为消息主题名称,第二级目录为主题的队列 id
6+
7+
为了加速 ConsumeQueue 消息的查询速度并节省磁盘空间,不会存储消息的全量信息,只会 存储一些 关键信息,如 8 字节的 CommmitLog 偏移量、4 字节的文件大小、8 字节的 tag 哈希码
8+
9+
1、根据消息存储时间查找物理偏移量:
10+
11+
org.apache.rocketmq.store.ConsumeQueue#getOffsetInQueueByTime
12+
13+
第一步:根据时间戳定位物理文件
14+
15+
```java
16+
public MappedFile getMappedFileByTime(final long timestamp) {
17+
Object[] mfs = this.copyMappedFiles(0);
18+
19+
if (null == mfs)
20+
return null;
21+
22+
for (int i = 0; i < mfs.length; i++) {
23+
MappedFile mappedFile = (MappedFile) mfs[i];
24+
if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
25+
return mappedFile;
26+
}
27+
}
28+
29+
return (MappedFile) mfs[mfs.length - 1];
30+
}
31+
```
32+
33+
从第一个文件 开始,找到第一个更新时间大于该时间戳的文件
34+
35+
第二步:利用二分查找法来加速检索
36+
37+
计算最低查找偏移量,如果消息队列偏移量大于文件的偏移量,则最低偏移量等于消息队列偏移量减去文件的偏移量,反之为 0
38+
39+
`int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0;`
40+
41+
计算中间偏移量,其中*`CQ_STORE_UNIT_SIZE` =* 8 字节的 CommmitLog 偏移量 + 4 字节的文件大小+8 字节的 tag 哈希码
42+
43+
`midOffset = (low + high) / (2 * *CQ_STORE_UNIT_SIZE*) * *CQ_STORE_UNIT_SIZE*;`
44+
45+
如果得到的物理偏移量小于当前最小物理偏移量,则待查找消息的物理偏移量大于 midOffset,将 low 设置为 midOffset,继续查询
46+
47+
```java
48+
byteBuffer.position(midOffset);
49+
long phyOffset = byteBuffer.getLong();
50+
int size = byteBuffer.getInt();
51+
if (phyOffset < minPhysicOffset) {
52+
low = midOffset +CQ_STORE_UNIT_SIZE;
53+
leftOffset = midOffset;
54+
continue;
55+
}
56+
```
57+
58+
如果得到的物理偏移量大于最小物理偏移量,说明该消息为有效信息,则根据消息物理偏移量和消息长度获取消息存储的时间戳
59+
60+
```java
61+
long storeTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
62+
```
63+
64+
如果存储时间小于 0,则为无效消息,返回 0;
65+
66+
如果存储时间戳等于待查找时间戳,说明查找到了目标消息,设置 targetOffset,跳出循环;
67+
68+
如果存储时间戳大于待查找时间戳,说明待查找消息的物理偏移量小于 midOffset,设置 high 为 midOffset,设置 rightIndexValue 等于 storeTime,设置 rightOffset 为 midOffset;
69+
70+
如果存储时间戳小于待查找时间戳,说明待查找消息的物理偏移量大于 midOffset,设置 low 为 midOffset,设置 leftIndexValue 等于 storeTime,设置 leftOffset 为 midOffset
71+
72+
```java
73+
if (storeTime < 0) {
74+
return 0;
75+
} else if (storeTime == timestamp) {
76+
targetOffset = midOffset;
77+
break;
78+
} else if (storeTime > timestamp) {
79+
high = midOffset -CQ_STORE_UNIT_SIZE;
80+
rightOffset = midOffset;
81+
rightIndexValue = storeTime;
82+
} else {
83+
low = midOffset +CQ_STORE_UNIT_SIZE;
84+
leftOffset = midOffset;
85+
leftIndexValue = storeTime;
86+
}
87+
```
88+
89+
如果 targetOffset 不等于-1,表示找到了存储时间戳等于待查找时间戳的消息;
90+
91+
如果 leftIndexValue 等于-1,返回大于并且最接近待查找消息的时间戳的偏移量
92+
93+
如果 rightIndexValue 等于-1,返回小于并且最接近待查找消息的时间戳的偏移量
94+
95+
```java
96+
if (targetOffset != -1) {
97+
98+
offset = targetOffset;
99+
} else {
100+
if (leftIndexValue == -1) {
101+
offset = rightOffset;
102+
} else if (rightIndexValue == -1) {
103+
offset = leftOffset;
104+
} else {
105+
offset = Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp - rightIndexValue) ? rightOffset : leftOffset;
106+
}
107+
}
108+
```
109+
110+
2、根据当前偏移量获取下一个文件的偏移量
111+
112+
org.apache.rocketmq.store.ConsumeQueue#rollNextFile
113+
114+
```java
115+
public long rollNextFile(final long index) {
116+
int mappedFileSize = this.mappedFileSize;
117+
int totalUnitsInFile = mappedFileSize /CQ_STORE_UNIT_SIZE;
118+
return index + totalUnitsInFile - index % totalUnitsInFile;
119+
}
120+
```
121+
122+
3、ConsumeQueue 添加消息
123+
124+
org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfo
125+
126+
将消息偏移量、消息长度、tag 哈希码写入 ByteBuffer,将内容追加到 ConsumeQueue 的内存映射文件中。
127+
128+
```java
129+
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
130+
final long cqOffset) {
131+
132+
if (offset + size <= this.maxPhysicOffset) {
133+
log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
134+
return true;
135+
}
136+
137+
this.byteBufferIndex.flip();
138+
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
139+
this.byteBufferIndex.putLong(offset);
140+
this.byteBufferIndex.putInt(size);
141+
this.byteBufferIndex.putLong(tagsCode);
142+
143+
final long expectLogicOffset = cqOffset *CQ_STORE_UNIT_SIZE;
144+
145+
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
146+
if (mappedFile != null) {
147+
148+
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
149+
this.minLogicOffset = expectLogicOffset;
150+
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
151+
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
152+
this.fillPreBlank(mappedFile, expectLogicOffset);
153+
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
154+
+ mappedFile.getWrotePosition());
155+
}
156+
157+
if (cqOffset != 0) {
158+
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
159+
160+
if (expectLogicOffset < currentLogicOffset) {
161+
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
162+
return true;
163+
}
164+
165+
if (expectLogicOffset != currentLogicOffset) {
166+
LOG_ERROR.warn("[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
167+
expectLogicOffset,
168+
currentLogicOffset,
169+
this.topic,
170+
this.queueId,
171+
expectLogicOffset - currentLogicOffset
172+
);
173+
}
174+
}
175+
this.maxPhysicOffset = offset + size;
176+
return mappedFile.appendMessage(this.byteBufferIndex.array());
177+
}
178+
return false;
179+
}
180+
```
181+
182+
4、ConsumeQueue 文件删除
183+
184+
org.apache.rocketmq.store.ConsumeQueue#destroy
185+
186+
重置 ConsumeQueue 的 maxPhysicOffset 与 minLogicOffset,调用 MappedFileQueue 的 destroy()方法将 ConsumeQueue 目录下的文件全部删除
187+
188+
```java
189+
public void destroy() {
190+
this.maxPhysicOffset = -1;
191+
this.minLogicOffset = 0;
192+
this.mappedFileQueue.destroy();
193+
if (isExtReadEnable()) {
194+
this.consumeQueueExt.destroy();
195+
}
196+
}
197+
```

0 commit comments

Comments
 (0)