@@ -168,39 +168,47 @@ public function consume(ExtensionInterface $runtimeExtension = null)
168168 $ logger = $ context ->getLogger () ?: new NullLogger ();
169169 $ logger ->info ('Start consuming ' );
170170
171- while (true ) {
172- try {
173- if ($ this ->psrContext instanceof AmqpContext) {
174- $ callback = function (AmqpMessage $ message , AmqpConsumer $ consumer ) use ($ extension , $ logger , &$ context ) {
175- $ currentProcessor = null ;
171+ if ($ this ->psrContext instanceof AmqpContext) {
172+ $ callback = function (AmqpMessage $ message , AmqpConsumer $ consumer ) use ($ extension , $ logger , &$ context ) {
173+ $ currentProcessor = null ;
174+
175+ /** @var PsrQueue $queue */
176+ foreach ($ this ->boundProcessors as list ($ queue , $ processor )) {
177+ if ($ queue ->getQueueName () === $ consumer ->getQueue ()->getQueueName ()) {
178+ $ currentProcessor = $ processor ;
179+ }
180+ }
176181
177- /** @var PsrQueue $queue */
178- foreach ($ this ->boundProcessors as list ($ queue , $ processor )) {
179- if ($ queue ->getQueueName () === $ consumer ->getQueue ()->getQueueName ()) {
180- $ currentProcessor = $ processor ;
181- }
182- }
182+ if (false == $ currentProcessor ) {
183+ throw new \LogicException (sprintf ('The processor for the queue "%s" could not be found. ' , $ consumer ->getQueue ()->getQueueName ()));
184+ }
183185
184- if (false == $ currentProcessor ) {
185- throw new \LogicException (sprintf ('The processor for the queue "%s" could not be found. ' , $ consumer ->getQueue ()->getQueueName ()));
186- }
186+ $ context = new Context ($ this ->psrContext );
187+ $ context ->setLogger ($ logger );
188+ $ context ->setPsrQueue ($ consumer ->getQueue ());
189+ $ context ->setPsrConsumer ($ consumer );
190+ $ context ->setPsrProcessor ($ currentProcessor );
191+ $ context ->setPsrMessage ($ message );
187192
188- $ context = new Context ($ this ->psrContext );
189- $ context ->setLogger ($ logger );
190- $ context ->setPsrQueue ($ consumer ->getQueue ());
191- $ context ->setPsrConsumer ($ consumer );
192- $ context ->setPsrProcessor ($ currentProcessor );
193- $ context ->setPsrMessage ($ message );
193+ $ this ->doConsume ($ extension , $ context );
194194
195- $ this ->doConsume ($ extension , $ context );
195+ return true ;
196+ };
196197
197- return true ;
198- };
198+ foreach ( $ consumers as $ consumer ) {
199+ /* @var AmqpConsumer $consumer */
199200
200- foreach ($ consumers as $ consumer ) {
201- /* @var AmqpConsumer $consumer */
201+ $ this ->psrContext ->subscribe ($ consumer , $ callback );
202+ }
203+ }
202204
203- $ this ->psrContext ->subscribe ($ consumer , $ callback );
205+ while (true ) {
206+ try {
207+ if ($ this ->psrContext instanceof AmqpContext) {
208+ $ extension ->onBeforeReceive ($ context );
209+
210+ if ($ context ->isExecutionInterrupted ()) {
211+ throw new ConsumptionInterruptedException ();
204212 }
205213
206214 $ this ->psrContext ->consume ($ this ->receiveTimeout );
@@ -266,16 +274,14 @@ protected function doConsume(ExtensionInterface $extension, Context $context)
266274 $ consumer = $ context ->getPsrConsumer ();
267275 $ logger = $ context ->getLogger ();
268276
269- if (false == $ context ->getPsrMessage () instanceof AmqpContext) {
270- $ extension ->onBeforeReceive ($ context );
271- }
272-
273277 if ($ context ->isExecutionInterrupted ()) {
274278 throw new ConsumptionInterruptedException ();
275279 }
276280
277281 $ message = $ context ->getPsrMessage ();
278282 if (false == $ message ) {
283+ $ extension ->onBeforeReceive ($ context );
284+
279285 if ($ message = $ consumer ->receive ($ this ->receiveTimeout )) {
280286 $ context ->setPsrMessage ($ message );
281287 }
@@ -312,10 +318,6 @@ protected function doConsume(ExtensionInterface $extension, Context $context)
312318 $ logger ->info (sprintf ('Message processed: %s ' , $ context ->getResult ()));
313319
314320 $ extension ->onPostReceived ($ context );
315-
316- if ($ context ->getPsrMessage () instanceof AmqpContext) {
317- $ extension ->onBeforeReceive ($ context );
318- }
319321 } else {
320322 usleep ($ this ->idleTimeout * 1000 );
321323 $ extension ->onIdle ($ context );
0 commit comments