@@ -59,9 +59,34 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue()
5959 ->with (1000 )
6060 ->willReturn ($ kafkaMessage )
6161 ;
62+
63+ $ consumer = new RdKafkaConsumer (
64+ $ kafkaConsumer ,
65+ $ this ->createContextMock (),
66+ $ destination ,
67+ $ this ->createSerializerMock ()
68+ );
69+
70+ $ this ->assertNull ($ consumer ->receive (1000 ));
71+ }
72+
73+ public function testShouldSubscribeOnFirstReceiveOnly ()
74+ {
75+ $ destination = new RdKafkaTopic ('dest ' );
76+
77+ $ kafkaMessage = new Message ();
78+ $ kafkaMessage ->err = RD_KAFKA_RESP_ERR__TIMED_OUT ;
79+
80+ $ kafkaConsumer = $ this ->createKafkaConsumerMock ();
6281 $ kafkaConsumer
6382 ->expects ($ this ->once ())
64- ->method ('unsubscribe ' )
83+ ->method ('subscribe ' )
84+ ->with (['dest ' ])
85+ ;
86+ $ kafkaConsumer
87+ ->expects ($ this ->any ())
88+ ->method ('consume ' )
89+ ->willReturn ($ kafkaMessage )
6590 ;
6691
6792 $ consumer = new RdKafkaConsumer (
@@ -71,7 +96,9 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue()
7196 $ this ->createSerializerMock ()
7297 );
7398
74- $ this ->assertNull ($ consumer ->receive (1000 ));
99+ $ consumer ->receive (1000 );
100+ $ consumer ->receive (1000 );
101+ $ consumer ->receive (1000 );
75102 }
76103
77104 public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue ()
@@ -96,10 +123,6 @@ public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue()
96123 ->with (1000 )
97124 ->willReturn ($ kafkaMessage )
98125 ;
99- $ kafkaConsumer
100- ->expects ($ this ->once ())
101- ->method ('unsubscribe ' )
102- ;
103126
104127 $ serializer = $ this ->createSerializerMock ();
105128 $ serializer
0 commit comments