Skip to content

Commit 6c0e31d

Browse files
committed
[client] Improve client extension.
1 parent 0560f5a commit 6c0e31d

11 files changed

Lines changed: 491 additions & 358 deletions

File tree

pkg/enqueue/Client/ChainExtension.php

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,31 @@ public function __construct(array $extensions)
1717
$this->extensions = $extensions;
1818
}
1919

20-
/**
21-
* {@inheritdoc}
22-
*/
23-
public function onPreSend($topic, Message $message)
20+
public function onPreSendEvent(PreSend $event): void
2421
{
2522
foreach ($this->extensions as $extension) {
26-
$extension->onPreSend($topic, $message);
23+
$extension->onPreSendEvent($event);
2724
}
2825
}
2926

30-
/**
31-
* {@inheritdoc}
32-
*/
33-
public function onPostSend($topic, Message $message)
27+
public function onPreSendCommand(PreSend $event): void
28+
{
29+
foreach ($this->extensions as $extension) {
30+
$extension->onPreSendCommand($event);
31+
}
32+
}
33+
34+
public function onPreDriverSend(PreDriverSend $context): void
35+
{
36+
foreach ($this->extensions as $extension) {
37+
$extension->onPreDriverSend($context);
38+
}
39+
}
40+
41+
public function onPostSend(PostSend $event): void
3442
{
3543
foreach ($this->extensions as $extension) {
36-
$extension->onPostSend($topic, $message);
44+
$extension->onPostSend($event);
3745
}
3846
}
3947
}

pkg/enqueue/Client/ConsumptionExtension/ExclusiveCommandExtension.php

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,16 @@
33
namespace Enqueue\Client\ConsumptionExtension;
44

55
use Enqueue\Client\Config;
6+
use Enqueue\Client\EmptyExtensionTrait as ClientEmptyExtensionTrait;
67
use Enqueue\Client\ExtensionInterface as ClientExtensionInterface;
7-
use Enqueue\Client\Message;
8+
use Enqueue\Client\PreSend;
89
use Enqueue\Consumption\Context;
9-
use Enqueue\Consumption\EmptyExtensionTrait;
10+
use Enqueue\Consumption\EmptyExtensionTrait as ConsumptionEmptyExtensionTrait;
1011
use Enqueue\Consumption\ExtensionInterface as ConsumptionExtensionInterface;
1112

1213
class ExclusiveCommandExtension implements ConsumptionExtensionInterface, ClientExtensionInterface
1314
{
14-
use EmptyExtensionTrait;
15+
use ConsumptionEmptyExtensionTrait, ClientEmptyExtensionTrait;
1516

1617
/**
1718
* @var string[]
@@ -60,26 +61,14 @@ public function onPreReceived(Context $context)
6061
}
6162
}
6263

63-
/**
64-
* {@inheritdoc}
65-
*/
66-
public function onPreSend($topic, Message $message)
64+
public function onPreSendCommand(PreSend $context): void
6765
{
68-
if (Config::COMMAND_TOPIC != $topic) {
69-
return;
70-
}
66+
$message = $context->getMessage();
67+
$command = $context->getCommandOrTopic();
7168

72-
$commandName = $message->getProperty(Config::PARAMETER_COMMAND_NAME);
73-
if (array_key_exists($commandName, $this->processorNameToQueueNameMap)) {
74-
$message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $commandName);
75-
$message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $this->processorNameToQueueNameMap[$commandName]);
69+
if (array_key_exists($command, $this->processorNameToQueueNameMap)) {
70+
$message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $command);
71+
$message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $this->processorNameToQueueNameMap[$command]);
7672
}
7773
}
78-
79-
/**
80-
* {@inheritdoc}
81-
*/
82-
public function onPostSend($topic, Message $message)
83-
{
84-
}
8574
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\Client;
4+
5+
trait EmptyExtensionTrait
6+
{
7+
public function onPreSendEvent(PreSend $context): void
8+
{
9+
}
10+
11+
public function onPreSendCommand(PreSend $context): void
12+
{
13+
}
14+
15+
public function onPreDriverSend(PreDriverSend $context): void
16+
{
17+
}
18+
19+
public function onPostSend(PostSend $context): void
20+
{
21+
}
22+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
<?php
2+
3+
namespace Enqueue\Client\Extension;
4+
5+
use Enqueue\Client\EmptyExtensionTrait;
6+
use Enqueue\Client\ExtensionInterface;
7+
use Enqueue\Client\Message;
8+
use Enqueue\Client\PreSend;
9+
use Enqueue\Util\JSON;
10+
11+
class PrepareBodyExtension implements ExtensionInterface
12+
{
13+
use EmptyExtensionTrait;
14+
15+
public function onPreSendEvent(PreSend $context): void
16+
{
17+
$this->prepareBody($context->getMessage());
18+
}
19+
20+
public function onPreSendCommand(PreSend $context): void
21+
{
22+
$this->prepareBody($context->getMessage());
23+
}
24+
25+
private function prepareBody(Message $message): void
26+
{
27+
$body = $message->getBody();
28+
$contentType = $message->getContentType();
29+
30+
if (is_scalar($body) || null === $body) {
31+
$contentType = $contentType ?: 'text/plain';
32+
$body = (string) $body;
33+
} elseif (is_array($body)) {
34+
// only array of scalars is allowed.
35+
array_walk_recursive($body, function ($value) {
36+
if (!is_scalar($value) && null !== $value) {
37+
throw new \LogicException(sprintf(
38+
'The message\'s body must be an array of scalars. Found not scalar in the array: %s',
39+
is_object($value) ? get_class($value) : gettype($value)
40+
));
41+
}
42+
});
43+
44+
$contentType = $contentType ?: 'application/json';
45+
$body = JSON::encode($body);
46+
} elseif ($body instanceof \JsonSerializable) {
47+
$contentType = $contentType ?: 'application/json';
48+
$body = JSON::encode($body);
49+
} else {
50+
throw new \InvalidArgumentException(sprintf(
51+
'The message\'s body must be either null, scalar, array or object (implements \JsonSerializable). Got: %s',
52+
is_object($body) ? get_class($body) : gettype($body)
53+
));
54+
}
55+
56+
$message->setContentType($contentType);
57+
$message->setBody($body);
58+
}
59+
}

pkg/enqueue/Client/ExtensionInterface.php

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,21 @@
44

55
interface ExtensionInterface
66
{
7-
/**
8-
* @param string $topic
9-
* @param Message $message
10-
*
11-
* @return
12-
*/
13-
public function onPreSend($topic, Message $message);
7+
public function onPreSendEvent(PreSend $context): void;
148

15-
/**
16-
* @param string $topic
17-
* @param Message $message
18-
*
19-
* @return
20-
*/
21-
public function onPostSend($topic, Message $message);
9+
public function onPreSendCommand(PreSend $context): void;
10+
11+
public function onPreDriverSend(PreDriverSend $context): void;
12+
13+
public function onPostSend(PostSend $context): void;
14+
15+
// /**
16+
// * @deprecated
17+
// */
18+
// public function onPreSend($topic, Message $message);
19+
//
20+
// /**
21+
// * @deprecated
22+
// */
23+
// public function onPostSend($topic, Message $message);
2224
}

pkg/enqueue/Client/PostSend.php

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
<?php
2+
3+
namespace Enqueue\Client;
4+
5+
class PostSend
6+
{
7+
private $message;
8+
9+
private $producer;
10+
11+
private $driver;
12+
13+
public function __construct(Message $message, ProducerInterface $producer, DriverInterface $driver)
14+
{
15+
$this->message = $message;
16+
$this->producer = $producer;
17+
$this->driver = $driver;
18+
}
19+
20+
public function getMessage(): Message
21+
{
22+
return $this->message;
23+
}
24+
25+
public function getProducer(): ProducerInterface
26+
{
27+
return $this->producer;
28+
}
29+
30+
public function getDriver(): DriverInterface
31+
{
32+
return $this->driver;
33+
}
34+
35+
public function isEvent(): bool
36+
{
37+
return Config::COMMAND_TOPIC !== $this->message->getProperty(Config::PARAMETER_TOPIC_NAME);
38+
}
39+
40+
public function getCommand(): string
41+
{
42+
return $this->message->getProperty(Config::PARAMETER_COMMAND_NAME);
43+
}
44+
45+
public function getTopic(): string
46+
{
47+
return $this->message->getProperty(Config::PARAMETER_TOPIC_NAME);
48+
}
49+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
<?php
2+
3+
namespace Enqueue\Client;
4+
5+
class PreDriverSend
6+
{
7+
private $message;
8+
9+
private $producer;
10+
11+
private $driver;
12+
13+
public function __construct(Message $message, ProducerInterface $producer, DriverInterface $driver)
14+
{
15+
$this->message = $message;
16+
$this->producer = $producer;
17+
$this->driver = $driver;
18+
}
19+
20+
public function getMessage(): Message
21+
{
22+
return $this->message;
23+
}
24+
25+
public function getProducer(): ProducerInterface
26+
{
27+
return $this->producer;
28+
}
29+
30+
public function getDriver(): DriverInterface
31+
{
32+
return $this->driver;
33+
}
34+
35+
public function isEvent(): bool
36+
{
37+
return Config::COMMAND_TOPIC !== $this->message->getProperty(Config::PARAMETER_TOPIC_NAME);
38+
}
39+
40+
public function getCommand(): string
41+
{
42+
return $this->message->getProperty(Config::PARAMETER_COMMAND_NAME);
43+
}
44+
45+
public function getTopic(): string
46+
{
47+
return $this->message->getProperty(Config::PARAMETER_TOPIC_NAME);
48+
}
49+
}

pkg/enqueue/Client/PreSend.php

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
<?php
2+
3+
namespace Enqueue\Client;
4+
5+
class PreSend
6+
{
7+
private $message;
8+
9+
private $originalMessage;
10+
11+
private $commandOrTopic;
12+
13+
private $producer;
14+
15+
private $driver;
16+
17+
public function __construct(
18+
string $commandOrTopic,
19+
Message $message,
20+
ProducerInterface $producer,
21+
DriverInterface $driver
22+
) {
23+
$this->message = $message;
24+
$this->commandOrTopic = $commandOrTopic;
25+
$this->producer = $producer;
26+
$this->driver = $driver;
27+
28+
$this->originalMessage = clone $message;
29+
}
30+
31+
public function getCommandOrTopic(): string
32+
{
33+
return $this->commandOrTopic;
34+
}
35+
36+
public function changeCommandOrTopic(string $commandOrTopic): void
37+
{
38+
$this->commandOrTopic = $commandOrTopic;
39+
}
40+
41+
public function changeBody($body, string $contentType = null): void
42+
{
43+
$this->message->setBody($body);
44+
45+
if (null !== $contentType) {
46+
$this->message->setContentType($contentType);
47+
}
48+
}
49+
50+
public function getMessage(): Message
51+
{
52+
return $this->message;
53+
}
54+
55+
public function getOriginalMessage(): Message
56+
{
57+
return $this->originalMessage;
58+
}
59+
60+
public function getProducer(): ProducerInterface
61+
{
62+
return $this->producer;
63+
}
64+
65+
public function getDriver(): DriverInterface
66+
{
67+
return $this->driver;
68+
}
69+
}

0 commit comments

Comments
 (0)