@@ -50,8 +50,7 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue()
5050 $ kafkaConsumer = $ this ->createKafkaConsumerMock ();
5151 $ kafkaConsumer
5252 ->expects ($ this ->once ())
53- ->method ('subscribe ' )
54- ->with (['dest ' ])
53+ ->method ('assign ' )
5554 ;
5655 $ kafkaConsumer
5756 ->expects ($ this ->once ())
@@ -70,6 +69,36 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue()
7069 $ this ->assertNull ($ consumer ->receive (1000 ));
7170 }
7271
72+ public function testShouldPassProperlyConfiguredTopicPartitionOnAssign ()
73+ {
74+ $ destination = new RdKafkaTopic ('dest ' );
75+
76+ $ kafkaMessage = new Message ();
77+ $ kafkaMessage ->err = RD_KAFKA_RESP_ERR__TIMED_OUT ;
78+
79+ $ kafkaConsumer = $ this ->createKafkaConsumerMock ();
80+ $ kafkaConsumer
81+ ->expects ($ this ->once ())
82+ ->method ('assign ' )
83+ ;
84+ $ kafkaConsumer
85+ ->expects ($ this ->any ())
86+ ->method ('consume ' )
87+ ->willReturn ($ kafkaMessage )
88+ ;
89+
90+ $ consumer = new RdKafkaConsumer (
91+ $ kafkaConsumer ,
92+ $ this ->createContextMock (),
93+ $ destination ,
94+ $ this ->createSerializerMock ()
95+ );
96+
97+ $ consumer ->receive (1000 );
98+ $ consumer ->receive (1000 );
99+ $ consumer ->receive (1000 );
100+ }
101+
73102 public function testShouldSubscribeOnFirstReceiveOnly ()
74103 {
75104 $ destination = new RdKafkaTopic ('dest ' );
@@ -80,8 +109,7 @@ public function testShouldSubscribeOnFirstReceiveOnly()
80109 $ kafkaConsumer = $ this ->createKafkaConsumerMock ();
81110 $ kafkaConsumer
82111 ->expects ($ this ->once ())
83- ->method ('subscribe ' )
84- ->with (['dest ' ])
112+ ->method ('assign ' )
85113 ;
86114 $ kafkaConsumer
87115 ->expects ($ this ->any ())
@@ -101,6 +129,38 @@ public function testShouldSubscribeOnFirstReceiveOnly()
101129 $ consumer ->receive (1000 );
102130 }
103131
132+ public function testThrowOnOffsetChangeAfterSubscribing ()
133+ {
134+ $ destination = new RdKafkaTopic ('dest ' );
135+
136+ $ kafkaMessage = new Message ();
137+ $ kafkaMessage ->err = RD_KAFKA_RESP_ERR__TIMED_OUT ;
138+
139+ $ kafkaConsumer = $ this ->createKafkaConsumerMock ();
140+ $ kafkaConsumer
141+ ->expects ($ this ->once ())
142+ ->method ('assign ' )
143+ ;
144+ $ kafkaConsumer
145+ ->expects ($ this ->any ())
146+ ->method ('consume ' )
147+ ->willReturn ($ kafkaMessage )
148+ ;
149+
150+ $ consumer = new RdKafkaConsumer (
151+ $ kafkaConsumer ,
152+ $ this ->createContextMock (),
153+ $ destination ,
154+ $ this ->createSerializerMock ()
155+ );
156+
157+ $ consumer ->receive (1000 );
158+
159+ $ this ->expectException (\LogicException::class);
160+ $ this ->expectExceptionMessage ('The consumer has already subscribed. ' );
161+ $ consumer ->setOffset (123 );
162+ }
163+
104164 public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue ()
105165 {
106166 $ destination = new RdKafkaTopic ('dest ' );
@@ -114,8 +174,7 @@ public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue()
114174 $ kafkaConsumer = $ this ->createKafkaConsumerMock ();
115175 $ kafkaConsumer
116176 ->expects ($ this ->once ())
117- ->method ('subscribe ' )
118- ->with (['dest ' ])
177+ ->method ('assign ' )
119178 ;
120179 $ kafkaConsumer
121180 ->expects ($ this ->once ())
0 commit comments