66use Interop \Queue \PsrConsumer ;
77use Interop \Queue \PsrMessage ;
88use RdKafka \KafkaConsumer ;
9+ use RdKafka \TopicPartition ;
910
1011class RdKafkaConsumer implements PsrConsumer
1112{
@@ -36,6 +37,11 @@ class RdKafkaConsumer implements PsrConsumer
3637 */
3738 private $ commitAsync ;
3839
40+ /**
41+ * @var int|null
42+ */
43+ private $ offset ;
44+
3945 /**
4046 * @param KafkaConsumer $consumer
4147 * @param RdKafkaContext $context
@@ -49,6 +55,7 @@ public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, Rd
4955 $ this ->topic = $ topic ;
5056 $ this ->subscribed = false ;
5157 $ this ->commitAsync = false ;
58+ $ this ->offset = null ;
5259
5360 $ this ->setSerializer ($ serializer );
5461 }
@@ -69,6 +76,15 @@ public function setCommitAsync($async)
6976 $ this ->commitAsync = (bool ) $ async ;
7077 }
7178
79+ public function setOffset ($ offset )
80+ {
81+ if ($ this ->subscribed ) {
82+ throw new \LogicException ('The consumer has already subscribed. ' );
83+ }
84+
85+ $ this ->offset = $ offset ;
86+ }
87+
7288 /**
7389 * {@inheritdoc}
7490 */
@@ -83,7 +99,11 @@ public function getQueue()
8399 public function receive ($ timeout = 0 )
84100 {
85101 if (false == $ this ->subscribed ) {
86- $ this ->consumer ->subscribe ([$ this ->topic ->getTopicName ()]);
102+ $ this ->consumer ->assign ([new TopicPartition (
103+ $ this ->getQueue ()->getQueueName (),
104+ $ this ->getQueue ()->getPartition (),
105+ $ this ->offset
106+ )]);
87107
88108 $ this ->subscribed = true ;
89109 }
0 commit comments