Skip to content

Commit dbe617e

Browse files
authored
feat: add PullMessage process (doocs#118)
* add PullMessage process
1 parent dccf26c commit dbe617e

2 files changed

Lines changed: 326 additions & 0 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@
282282
- [RocketMQ CommitLog详解](docs/rocketmq/rocketmq-commitlog.md)
283283
- [RocketMQ IndexFile详解](docs/rocketmq/rocketmq-indexfile.md)
284284
- [RocketMQ 消费者启动流程](docs/rocketmq/rocketmq-consumer-start.md)
285+
- [RocketMQ 消息拉取流程](docs/rocketmq/rocketmq-pullmessage.md)
285286

286287
## 番外篇(JDK 1.8)
287288

Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
# RocketMQ 消息拉取流程
2+
3+
之前在消费者启动流程中描述过 MQClientInstance 的启动流程,在启动过程中会启动 PullMessageService,它继承了`ServiceThread`,并且 ServiceThread 实现了 Runnable 接口,所以是单独启动了一个线程
4+
5+
`public class PullMessageService extends ServiceThread`
6+
7+
`public abstract class ServiceThread implements Runnable`
8+
9+
PullMessageService 的 run 方法如下:
10+
11+
`protected volatile boolean stopped = false;`
12+
13+
```java
14+
public void run() {
15+
log.info(this.getServiceName() + " service started");
16+
17+
while (!this.isStopped()) {
18+
try {
19+
PullRequest pullRequest = this.pullRequestQueue.take();
20+
this.pullMessage(pullRequest);
21+
} catch (InterruptedException ignored) {
22+
} catch (Exception e) {
23+
log.error("Pull Message Service Run Method exception", e);
24+
}
25+
}
26+
27+
log.info(this.getServiceName() + " service end");
28+
}
29+
```
30+
31+
只要没有停止,线程一直会从 PullRequestQueue 中获取 PullRequest 消息拉取任务,如果队列为空,会一直阻塞,直到有 PullRequest 被放入队列中,如果拿到了 PullRequest 就会调用 pullMessage 方法拉取消息
32+
33+
添加 PullRequest 有两个方法,一个是延迟添加,另一个是立即添加
34+
35+
```java
36+
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
37+
if (!isStopped()) {
38+
this.scheduledExecutorService.schedule(new Runnable() {
39+
@Override
40+
public void run() {
41+
PullMessageService.this.executePullRequestImmediately(pullRequest);
42+
}
43+
}, timeDelay, TimeUnit.MILLISECONDS);
44+
} else {
45+
log.warn("PullMessageServiceScheduledThread has shutdown");
46+
}
47+
}
48+
49+
public void executePullRequestImmediately(final PullRequest pullRequest) {
50+
try {
51+
this.pullRequestQueue.put(pullRequest);
52+
} catch (InterruptedException e) {
53+
log.error("executePullRequestImmediately pullRequestQueue.put", e);
54+
}
55+
}
56+
```
57+
58+
org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessage
59+
60+
拉取消息流程:
61+
62+
根据消费组获取`MQConsumerInner`,根据推模式还是拉模式,强转为`DefaultMQPushConsumerImpl`还是`DefaultLitePullConsumerImpl`
63+
64+
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
65+
66+
`第1步:获取处理队列`,`如果队列被丢弃结束`
67+
68+
```java
69+
final ProcessQueue processQueue = pullRequest.getProcessQueue();
70+
71+
if (processQueue.isDropped()) {
72+
log.info("the pull request[{}] is dropped.", pullRequest.toString());
73+
return;
74+
}
75+
```
76+
77+
第 2 步:`设置最后一次拉取时间戳`
78+
79+
`pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());`
80+
81+
第 3 步:`确认消费者是启动的状态,如果不是启动的状态,将PullRequest延迟3s放入队列`
82+
83+
```java
84+
try {
85+
this.makeSureStateOK();
86+
} catch (MQClientException e) {
87+
log.warn("pullMessage exception, consumer state not ok", e);
88+
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
89+
return;
90+
}
91+
```
92+
93+
第 4 步:`如果消费者停止了,将PullRequest延迟1s放入队列`
94+
95+
```java
96+
if (this.isPause()) {
97+
log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
98+
this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
99+
return;
100+
}
101+
```
102+
103+
第 5 步:`缓存的消息数量大于1000,将PullRequest延迟50ms放入队列,每触发1000次流控输出警告信息`
104+
105+
```java
106+
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
107+
this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
108+
if ((queueFlowControlTimes++ % 1000) == 0) {
109+
log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
110+
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
111+
}
112+
return;
113+
}
114+
```
115+
116+
第 6 步:`缓存的消息大小大于100M 将PullRequest延迟50ms放入队列,每触发1000次输出警告信息`
117+
118+
```java
119+
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
120+
this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
121+
if ((queueFlowControlTimes++ % 1000) == 0) {
122+
log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
123+
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
124+
}
125+
return;
126+
}
127+
```
128+
129+
第 7 步:`ProcessQueue中消息的最大偏移量与最小偏移量的差值不能大于2000,如果大于2000,触发流控,输出警告信息`
130+
131+
```java
132+
if (!this.consumeOrderly) {
133+
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
134+
this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
135+
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
136+
log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
137+
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
138+
pullRequest, queueMaxSpanFlowControlTimes);
139+
}
140+
return;
141+
}
142+
}
143+
```
144+
145+
第 8 步:`如果ProcessQueue被锁了,判断上一个PullRequest是否被锁,如果没有被锁通过RebalanceImpl计算拉取消息偏移量,如果计算异常,将请求延迟3s加入队列``如果下一次拉取消息 的偏移量大于计算出来的偏移量,说明要拉取的偏移量 大于消费偏移量,对 偏移量 进行修正,设置下一次拉取的偏移量为计算出来的偏移量`
146+
147+
```java
148+
if (processQueue.isLocked()) {
149+
if (!pullRequest.isPreviouslyLocked()) {
150+
long offset = -1L;
151+
try {
152+
offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
153+
} catch (Exception e) {
154+
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
155+
log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);
156+
return;
157+
}
158+
boolean brokerBusy = offset < pullRequest.getNextOffset();
159+
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
160+
pullRequest, offset, brokerBusy);
161+
if (brokerBusy) {
162+
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
163+
pullRequest, offset);
164+
}
165+
166+
pullRequest.setPreviouslyLocked(true);
167+
pullRequest.setNextOffset(offset);
168+
}
169+
} else {
170+
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
171+
log.info("pull message later because not locked in broker, {}", pullRequest);
172+
return;
173+
}
174+
```
175+
176+
第 9 步:`根据主题名称获取订阅信息,如果为空,将请求延迟3s放入队列`
177+
178+
```java
179+
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
180+
if (null == subscriptionData) {
181+
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
182+
log.warn("find the consumer's subscription failed, {}", pullRequest);
183+
return;
184+
}
185+
```
186+
187+
第 10 步:`创建PullCallback,为后面调用 拉取消息api做准备`
188+
189+
```java
190+
PullCallback pullCallback = new PullCallback() {
191+
@Override
192+
public void onSuccess(PullResult pullResult) {
193+
if (pullResult != null) {
194+
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
195+
subscriptionData);
196+
197+
switch (pullResult.getPullStatus()) {
198+
caseFOUND:
199+
long prevRequestOffset = pullRequest.getNextOffset();
200+
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
201+
long pullRT = System.currentTimeMillis() - beginTimestamp;
202+
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
203+
pullRequest.getMessageQueue().getTopic(), pullRT);
204+
205+
long firstMsgOffset = Long.MAX_VALUE;
206+
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
207+
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
208+
} else {
209+
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
210+
211+
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
212+
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
213+
214+
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
215+
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
216+
pullResult.getMsgFoundList(),
217+
processQueue,
218+
pullRequest.getMessageQueue(),
219+
dispatchToConsume);
220+
221+
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
222+
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
223+
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
224+
} else {
225+
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
226+
}
227+
}
228+
229+
if (pullResult.getNextBeginOffset() < prevRequestOffset
230+
|| firstMsgOffset < prevRequestOffset) {
231+
log.warn("[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
232+
pullResult.getNextBeginOffset(),
233+
firstMsgOffset,
234+
prevRequestOffset);
235+
}
236+
237+
break;
238+
caseNO_NEW_MSG:
239+
caseNO_MATCHED_MSG:
240+
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
241+
242+
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
243+
244+
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
245+
break;
246+
caseOFFSET_ILLEGAL:
247+
log.warn("the pull request offset illegal, {} {}",
248+
pullRequest.toString(), pullResult.toString());
249+
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
250+
251+
pullRequest.getProcessQueue().setDropped(true);
252+
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
253+
254+
@Override
255+
public void run() {
256+
try {
257+
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
258+
pullRequest.getNextOffset(), false);
259+
260+
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
261+
262+
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
263+
264+
log.warn("fix the pull request offset, {}", pullRequest);
265+
} catch (Throwable e) {
266+
log.error("executeTaskLater Exception", e);
267+
}
268+
}
269+
}, 10000);
270+
break;
271+
default:
272+
break;
273+
}
274+
}
275+
}
276+
277+
@Override
278+
public void onException(Throwable e) {
279+
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
280+
log.warn("execute the pull request exception", e);
281+
}
282+
283+
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
284+
}
285+
};
286+
```
287+
288+
第 11 步:`设置系统标记`
289+
290+
`FLAG_COMMIT_OFFSET: 消费进度 大于0`
291+
292+
`FLAG_SUSPEND: 拉取消息时支持线程挂起`
293+
294+
`FLAG_SUBSCRIPTION: 消息过滤机制表达式`
295+
296+
`FLAG_CLASS_FILTER: 消息过滤机制是否为类过滤`
297+
298+
```java
299+
int sysFlag = PullSysFlag.buildSysFlag(
300+
commitOffsetEnable, // commitOffset
301+
true, // suspend
302+
subExpression != null, // subscription
303+
classFilter // class filter
304+
);
305+
```
306+
307+
第 12 步:`调用 broker 拉取消息`
308+
309+
```java
310+
// 每一个参数的含义如下
311+
this.pullAPIWrapper.pullKernelImpl(
312+
pullRequest.getMessageQueue(), // 要拉取的消息队列
313+
subExpression, // 消息过滤表达式
314+
subscriptionData.getExpressionType(), // 过滤表达式类型
315+
subscriptionData.getSubVersion(), // 时间戳
316+
pullRequest.getNextOffset(), // 消息拉取的开始偏移量
317+
this.defaultMQPushConsumer.getPullBatchSize(), // 拉取消息的数量 默认32条
318+
sysFlag, // 系统标记
319+
commitOffsetValue, // 消费的偏移量
320+
BROKER_SUSPEND_MAX_TIME_MILLIS, // 允许broker挂起的时间 默认15s
321+
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 允许的超时时间 默认30s
322+
CommunicationMode.ASYNC, // 默认为异步拉取
323+
pullCallback // 拉取消息之后的回调
324+
);
325+
```

0 commit comments

Comments
 (0)