Skip to content

Commit 2f6e902

Browse files
committed
add driver send result
1 parent d5d0a98 commit 2f6e902

5 files changed

Lines changed: 73 additions & 9 deletions

File tree

pkg/enqueue/Client/Driver/GenericDriver.php

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
use Enqueue\Client\Config;
88
use Enqueue\Client\DriverInterface;
9+
use Enqueue\Client\DriverSendResult;
910
use Enqueue\Client\Message;
1011
use Enqueue\Client\MessagePriority;
1112
use Enqueue\Client\Route;
@@ -45,7 +46,7 @@ public function __construct(
4546
$this->routeCollection = $routeCollection;
4647
}
4748

48-
public function sendToRouter(Message $message): void
49+
public function sendToRouter(Message $message): DriverSendResult
4950
{
5051
if ($message->getProperty(Config::COMMAND)) {
5152
throw new \LogicException('Command must not be send to router but go directly to its processor.');
@@ -59,9 +60,11 @@ public function sendToRouter(Message $message): void
5960
$producer = $this->getContext()->createProducer();
6061

6162
$this->doSendToRouter($producer, $topic, $transportMessage);
63+
64+
return new DriverSendResult($topic, $transportMessage);
6265
}
6366

64-
public function sendToProcessor(Message $message): void
67+
public function sendToProcessor(Message $message): DriverSendResult
6568
{
6669
$topic = $message->getProperty(Config::TOPIC);
6770
$command = $message->getProperty(Config::COMMAND);
@@ -111,6 +114,8 @@ public function sendToProcessor(Message $message): void
111114
}
112115

113116
$this->doSendToProcessor($producer, $queue, $transportMessage);
117+
118+
return new DriverSendResult($queue, $transportMessage);
114119
}
115120

116121
public function setupBroker(LoggerInterface $logger = null): void

pkg/enqueue/Client/DriverInterface.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ public function createTransportMessage(Message $message): InteropMessage;
1515

1616
public function createClientMessage(InteropMessage $message): Message;
1717

18-
public function sendToRouter(Message $message): void;
18+
public function sendToRouter(Message $message): DriverSendResult;
1919

20-
public function sendToProcessor(Message $message): void;
20+
public function sendToProcessor(Message $message): DriverSendResult;
2121

2222
public function createQueue(string $queueName, bool $prefix = true): InteropQueue;
2323

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
<?php
2+
3+
namespace Enqueue\Client;
4+
5+
use Interop\Queue\Destination;
6+
use Interop\Queue\Message as TransportMessage;
7+
8+
final class DriverSendResult
9+
{
10+
/**
11+
* @var Destination
12+
*/
13+
private $transportDestination;
14+
15+
/**
16+
* @var TransportMessage
17+
*/
18+
private $transportMessage;
19+
20+
public function __construct(Destination $transportDestination, TransportMessage $transportMessage)
21+
{
22+
$this->transportDestination = $transportDestination;
23+
$this->transportMessage = $transportMessage;
24+
}
25+
26+
public function getTransportDestination(): Destination
27+
{
28+
return $this->transportDestination;
29+
}
30+
31+
public function getTransportMessage(): TransportMessage
32+
{
33+
return $this->transportMessage;
34+
}
35+
}

pkg/enqueue/Client/PostSend.php

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
namespace Enqueue\Client;
44

5+
use Interop\Queue\Destination;
6+
use Interop\Queue\Message as TransportMessage;
7+
58
final class PostSend
69
{
710
private $message;
@@ -10,11 +13,22 @@ final class PostSend
1013

1114
private $driver;
1215

13-
public function __construct(Message $message, ProducerInterface $producer, DriverInterface $driver)
14-
{
16+
private $transportDestination;
17+
18+
private $transportMessage;
19+
20+
public function __construct(
21+
Message $message,
22+
ProducerInterface $producer,
23+
DriverInterface $driver,
24+
Destination $transportDestination,
25+
TransportMessage $transportMessage
26+
) {
1527
$this->message = $message;
1628
$this->producer = $producer;
1729
$this->driver = $driver;
30+
$this->transportDestination = $transportDestination;
31+
$this->transportMessage = $transportMessage;
1832
}
1933

2034
public function getMessage(): Message
@@ -32,6 +46,16 @@ public function getDriver(): DriverInterface
3246
return $this->driver;
3347
}
3448

49+
public function getTransportDestination(): Destination
50+
{
51+
return $this->transportDestination;
52+
}
53+
54+
public function getTransportMessage(): TransportMessage
55+
{
56+
return $this->transportMessage;
57+
}
58+
3559
public function isEvent(): bool
3660
{
3761
return (bool) $this->message->getProperty(Config::TOPIC);

pkg/enqueue/Client/Producer.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,13 @@ private function doSend(Message $message): void
118118
$this->extension->onDriverPreSend(new DriverPreSend($message, $this, $this->driver));
119119

120120
if (Message::SCOPE_MESSAGE_BUS == $message->getScope()) {
121-
$this->driver->sendToRouter($message);
121+
$result = $this->driver->sendToRouter($message);
122122
} elseif (Message::SCOPE_APP == $message->getScope()) {
123-
$this->driver->sendToProcessor($message);
123+
$result = $this->driver->sendToProcessor($message);
124124
} else {
125125
throw new \LogicException(sprintf('The message scope "%s" is not supported.', $message->getScope()));
126126
}
127127

128-
$this->extension->onPostSend(new PostSend($message, $this, $this->driver));
128+
$this->extension->onPostSend(new PostSend($message, $this, $this->driver, $result->getTransportDestination(), $result->getTransportMessage()));
129129
}
130130
}

0 commit comments

Comments
 (0)