Skip to content

Commit ffaf2ec

Browse files
authored
feat(rocketmq): add message store process (doocs#111)
1 parent 9eb542f commit ffaf2ec

2 files changed

Lines changed: 192 additions & 0 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@
276276
- [RocketMQ NameServer 与 Broker 的通信](docs/rocketmq/rocketmq-nameserver-broker.md)
277277
- [RocketMQ 生产者启动流程](docs/rocketmq/rocketmq-producer-start.md)
278278
- [RocketMQ 消息发送流程](docs/rocketmq/rocketmq-send-message.md)
279+
- [RocketMQ 消息发送存储流程](docs/rocketmq/rocketmq-send-store.md)
279280

280281
## 番外篇(JDK 1.8)
281282

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
# RocketMQ 消息发送存储流程
2+
3+
第一步:检查消息存储状态
4+
5+
org.apache.rocketmq.store.DefaultMessageStore#checkStoreStatus
6+
7+
1、检查 broker 是否可用
8+
9+
```java
10+
if (this.shutdown) {
11+
log.warn("message store has shutdown, so putMessage is forbidden");
12+
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
13+
}
14+
```
15+
16+
2、检查 broker 的角色
17+
18+
```java
19+
if (BrokerRole.SLAVE== this.messageStoreConfig.getBrokerRole()) {
20+
long value = this.printTimes.getAndIncrement();
21+
if ((value % 50000) == 0) {
22+
log.warn("broke role is slave, so putMessage is forbidden");
23+
}
24+
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
25+
}
26+
```
27+
28+
3、检查 messageStore 是否可写
29+
30+
```java
31+
if (!this.runningFlags.isWriteable()) {
32+
long value = this.printTimes.getAndIncrement();
33+
if ((value % 50000) == 0) {
34+
log.warn("the message store is not writable. It may be caused by one of the following reasons: " +
35+
"the broker's disk is full, write to logic queue error, write to index file error, etc");
36+
}
37+
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
38+
} else {
39+
this.printTimes.set(0);
40+
}
41+
```
42+
43+
4、检查 pageCache
44+
45+
```java
46+
if (this.isOSPageCacheBusy()) {
47+
return PutMessageStatus.OS_PAGECACHE_BUSY;
48+
}
49+
```
50+
51+
第二步:检查消息
52+
53+
org.apache.rocketmq.store.DefaultMessageStore#checkMessage
54+
55+
1、校验主题的长度不能大于 127
56+
57+
```java
58+
if (msg.getTopic().length() > Byte.MAX_VALUE) {
59+
log.warn("putMessage message topic length too long " + msg.getTopic().length());
60+
return PutMessageStatus.MESSAGE_ILLEGAL;
61+
}
62+
```
63+
64+
2、校验属性的长度不能大于 32767
65+
66+
```java
67+
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
68+
log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
69+
return PutMessageStatus.MESSAGE_ILLEGAL;
70+
}
71+
```
72+
73+
第三步:获取当前可以写入的 CommitLog 文件
74+
75+
CommitLog 文件的存储目录为${ROCKET_HOME}/store/commitlog ,MappedFileQueue 对应此文件夹,MappedFile 对应文件夹下的文件
76+
77+
```java
78+
msg.setStoreTimestamp(beginLockTimestamp);
79+
80+
if (null == mappedFile || mappedFile.isFull()) {
81+
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
82+
}
83+
if (null == mappedFile) {
84+
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
85+
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
86+
}
87+
```
88+
89+
如果是第一次写入或者最新偏移量所属文件已满,创建新的文件
90+
91+
```java
92+
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
93+
long createOffset = -1;
94+
MappedFile mappedFileLast = getLastMappedFile();
95+
96+
if (mappedFileLast == null) {
97+
createOffset = startOffset - (startOffset % this.mappedFileSize);
98+
}
99+
100+
if (mappedFileLast != null && mappedFileLast.isFull()) {
101+
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
102+
}
103+
104+
if (createOffset != -1 && needCreate) {
105+
return tryCreateMappedFile(createOffset);
106+
}
107+
108+
return mappedFileLast;
109+
}
110+
```
111+
112+
第四步:将消息写入到 MappedFile 中
113+
114+
```java
115+
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
116+
PutMessageContext putMessageContext) {
117+
assert messageExt != null;
118+
assert cb != null;
119+
120+
int currentPos = this.wrotePosition.get();
121+
122+
if (currentPos < this.fileSize) {
123+
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
124+
byteBuffer.position(currentPos);
125+
AppendMessageResult result;
126+
if (messageExt instanceof MessageExtBrokerInner) {
127+
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
128+
(MessageExtBrokerInner) messageExt, putMessageContext);
129+
} else if (messageExt instanceof MessageExtBatch) {
130+
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
131+
(MessageExtBatch) messageExt, putMessageContext);
132+
} else {
133+
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
134+
}
135+
this.wrotePosition.addAndGet(result.getWroteBytes());
136+
this.storeTimestamp = result.getStoreTimestamp();
137+
return result;
138+
}
139+
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
140+
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
141+
}
142+
```
143+
144+
org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner, org.apache.rocketmq.store.CommitLog.PutMessageContext)
145+
146+
计算要写入的偏移量
147+
148+
`long wroteOffset = fileFromOffset + byteBuffer.position();`
149+
150+
对事务消息做特殊处理:
151+
152+
```java
153+
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
154+
switch (tranType) {
155+
// Prepared and Rollback message is not consumed, will not enter the
156+
// consumer queue
157+
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
158+
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
159+
queueOffset = 0L;
160+
break;
161+
case MessageSysFlag.TRANSACTION_NOT_TYPE:
162+
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
163+
default:
164+
break;
165+
}
166+
```
167+
168+
构造 AppendMessageResult:
169+
170+
```java
171+
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
172+
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
173+
```
174+
175+
事务消息特殊处理:
176+
177+
```java
178+
switch (tranType) {
179+
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
180+
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
181+
break;
182+
case MessageSysFlag.TRANSACTION_NOT_TYPE:
183+
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
184+
// The next update ConsumeQueue information
185+
CommitLog.this.topicQueueTable.put(key, ++queueOffset);
186+
CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);
187+
break;
188+
default:
189+
break;
190+
}
191+
```

0 commit comments

Comments
 (0)