Skip to content

Commit ed942f9

Browse files
committed
[client] Introduce route and route collection. Rework tags
1 parent 5038676 commit ed942f9

13 files changed

+1780
-160
lines changed

pkg/enqueue/Client/Route.php

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
<?php
2+
3+
namespace Enqueue\Client;
4+
5+
final class Route
6+
{
7+
const TOPIC = 'enqueue.client.topic_route';
8+
9+
const COMMAND = 'enqueue.client.command_route';
10+
11+
/**
12+
* @var string
13+
*/
14+
private $source;
15+
16+
/**
17+
* @var string
18+
*/
19+
private $sourceType;
20+
21+
/**
22+
* @var string
23+
*/
24+
private $processor;
25+
26+
/**
27+
* @var array
28+
*/
29+
private $options;
30+
31+
public function __construct(
32+
string $source,
33+
string $sourceType,
34+
string $processor,
35+
array $options = []
36+
) {
37+
$this->source = $source;
38+
$this->sourceType = $sourceType;
39+
$this->processor = $processor;
40+
$this->options = $options;
41+
}
42+
43+
public function getSource(): string
44+
{
45+
return $this->source;
46+
}
47+
48+
public function isCommand(): bool
49+
{
50+
return self::COMMAND === $this->sourceType;
51+
}
52+
53+
public function isTopic(): bool
54+
{
55+
return self::TOPIC === $this->sourceType;
56+
}
57+
58+
public function getProcessor(): string
59+
{
60+
return $this->processor;
61+
}
62+
63+
public function isProcessorExclusive(): bool
64+
{
65+
return (bool) $this->getOption('exclusive', false);
66+
}
67+
68+
public function isProcessorExternal(): bool
69+
{
70+
return (bool) $this->getOption('external', false);
71+
}
72+
73+
public function getQueue(): ?string
74+
{
75+
return $this->getOption('queue');
76+
}
77+
78+
public function isPrefixQueue(): bool
79+
{
80+
return (bool) $this->getOption('prefix_queue', false);
81+
}
82+
83+
public function getOptions(): array
84+
{
85+
return $this->options;
86+
}
87+
88+
public function getOption(string $name, $default = null)
89+
{
90+
return array_key_exists($name, $this->options) ? $this->options[$name] : $default;
91+
}
92+
93+
public function toArray(): array
94+
{
95+
return array_replace($this->options, [
96+
'source' => $this->source,
97+
'source_type' => $this->sourceType,
98+
'processor' => $this->processor,
99+
]);
100+
}
101+
102+
public static function fromArray(array $route): self
103+
{
104+
list(
105+
'source' => $source,
106+
'source_type' => $sourceType,
107+
'processor' => $processor) = $route;
108+
109+
unset($route['source'], $route['source_type'], $route['processor']);
110+
$options = $route;
111+
112+
return new self($source, $sourceType, $processor, $options);
113+
}
114+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
<?php
2+
3+
namespace Enqueue\Client;
4+
5+
class RouteCollection
6+
{
7+
/**
8+
* @var Route[]
9+
*/
10+
private $routes;
11+
12+
/**
13+
* @var Route[]
14+
*/
15+
private $commandRoutes;
16+
17+
/**
18+
* @var Route[]
19+
*/
20+
private $topicRoutes;
21+
22+
/**
23+
* @param Route[] $routes
24+
*/
25+
public function __construct(array $routes)
26+
{
27+
$this->routes = $routes;
28+
}
29+
30+
public function add(Route $route): void
31+
{
32+
$this->routes[] = $route;
33+
$this->topicRoutes = null;
34+
$this->commandRoutes = null;
35+
}
36+
37+
/**
38+
* @return Route[]
39+
*/
40+
public function allRoutes(): array
41+
{
42+
return $this->routes;
43+
}
44+
45+
/**
46+
* @return Route[]
47+
*/
48+
public function commandRoute(string $command): ?Route
49+
{
50+
if (null === $this->commandRoutes) {
51+
$commandRoutes = [];
52+
foreach ($this->routes as $route) {
53+
if ($route->isCommand()) {
54+
$commandRoutes[$route->getSource()] = $route;
55+
}
56+
}
57+
58+
$this->commandRoutes = $commandRoutes;
59+
}
60+
61+
return array_key_exists($command, $this->commandRoutes) ? $this->commandRoutes[$command] : null;
62+
}
63+
64+
/**
65+
* @return Route[]
66+
*/
67+
public function topicRoutes(string $topic): array
68+
{
69+
if (null === $this->topicRoutes) {
70+
$topicRoutes = [];
71+
foreach ($this->routes as $route) {
72+
if ($route->isTopic()) {
73+
$topicRoutes[$route->getSource()][$route->getProcessor()] = $route;
74+
}
75+
}
76+
77+
$this->topicRoutes = $topicRoutes;
78+
}
79+
80+
return array_key_exists($topic, $this->topicRoutes) ? $this->topicRoutes[$topic] : [];
81+
}
82+
83+
public function toArray(): array
84+
{
85+
$rawRoutes = [];
86+
foreach ($this->routes as $route) {
87+
$rawRoutes[] = $route->toArray();
88+
}
89+
90+
return $rawRoutes;
91+
}
92+
93+
public static function fromArray(array $rawRoutes): self
94+
{
95+
$routes = [];
96+
foreach ($rawRoutes as $rawRoute) {
97+
$routes[] = Route::fromArray($rawRoute);
98+
}
99+
100+
return new self($routes);
101+
}
102+
}

pkg/enqueue/Client/RouterProcessor.php

Lines changed: 35 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -7,115 +7,72 @@
77
use Interop\Queue\PsrMessage;
88
use Interop\Queue\PsrProcessor;
99

10-
class RouterProcessor implements PsrProcessor
10+
final class RouterProcessor implements PsrProcessor
1111
{
1212
/**
1313
* @var DriverInterface
1414
*/
1515
private $driver;
1616

1717
/**
18-
* @var array
18+
* @var RouteCollection
1919
*/
20-
private $eventRoutes;
20+
private $routeCollection;
2121

22-
/**
23-
* @var array
24-
*/
25-
private $commandRoutes;
26-
27-
/**
28-
* @param DriverInterface $driver
29-
* @param array $eventRoutes
30-
* @param array $commandRoutes
31-
*/
32-
public function __construct(DriverInterface $driver, array $eventRoutes = [], array $commandRoutes = [])
22+
public function __construct(DriverInterface $driver, RouteCollection $routeCollection)
3323
{
3424
$this->driver = $driver;
35-
36-
$this->eventRoutes = $eventRoutes;
37-
$this->commandRoutes = $commandRoutes;
25+
$this->routeCollection = $routeCollection;
3826
}
3927

40-
/**
41-
* @param string $topicName
42-
* @param string $queueName
43-
* @param string $processorName
44-
*/
45-
public function add($topicName, $queueName, $processorName)
28+
public function process(PsrMessage $message, PsrContext $context): Result
4629
{
47-
if (Config::COMMAND_TOPIC === $topicName) {
48-
$this->commandRoutes[$processorName] = $queueName;
49-
} else {
50-
$this->eventRoutes[$topicName][] = [$processorName, $queueName];
30+
$topic = $message->getProperty(Config::PARAMETER_TOPIC_NAME);
31+
if ($topic) {
32+
return $this->routeEvent($topic, $message);
5133
}
52-
}
5334

54-
/**
55-
* {@inheritdoc}
56-
*/
57-
public function process(PsrMessage $message, PsrContext $context)
58-
{
59-
$topicName = $message->getProperty(Config::PARAMETER_TOPIC_NAME);
60-
if (false == $topicName) {
61-
return Result::reject(sprintf(
62-
'Got message without required parameter: "%s"',
63-
Config::PARAMETER_TOPIC_NAME
64-
));
35+
$command = $message->getProperty(Config::PARAMETER_COMMAND_NAME);
36+
if ($command) {
37+
return $this->routeCommand($command, $message);
6538
}
6639

67-
if (Config::COMMAND_TOPIC === $topicName) {
68-
return $this->routeCommand($message);
69-
}
70-
71-
return $this->routeEvent($message);
40+
return Result::reject(sprintf(
41+
'Got message without required parameters. Either "%s" or "%s" property should be set',
42+
Config::PARAMETER_TOPIC_NAME,
43+
Config::PARAMETER_COMMAND_NAME
44+
));
7245
}
7346

74-
/**
75-
* @param PsrMessage $message
76-
*
77-
* @return string|Result
78-
*/
79-
private function routeEvent(PsrMessage $message)
47+
private function routeEvent(string $topic, PsrMessage $message): Result
8048
{
81-
$topicName = $message->getProperty(Config::PARAMETER_TOPIC_NAME);
49+
$count = 0;
50+
foreach ($this->routeCollection->topicRoutes($topic) as $route) {
51+
$processorMessage = clone $message;
52+
$processorMessage->setProperty(Config::PARAMETER_PROCESSOR_NAME, $route->getProcessor());
53+
$processorMessage->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $route->getQueue());
8254

83-
if (array_key_exists($topicName, $this->eventRoutes)) {
84-
foreach ($this->eventRoutes[$topicName] as $route) {
85-
$processorMessage = clone $message;
86-
$processorMessage->setProperty(Config::PARAMETER_PROCESSOR_NAME, $route[0]);
87-
$processorMessage->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $route[1]);
55+
$this->driver->sendToProcessor($this->driver->createClientMessage($processorMessage));
8856

89-
$this->driver->sendToProcessor($this->driver->createClientMessage($processorMessage));
90-
}
57+
++$count;
9158
}
9259

93-
return self::ACK;
60+
return Result::ack(sprintf('Routed to "%d" event subscribers', $count));
9461
}
9562

96-
/**
97-
* @param PsrMessage $message
98-
*
99-
* @return string|Result
100-
*/
101-
private function routeCommand(PsrMessage $message)
63+
private function routeCommand(string $command, PsrMessage $message): Result
10264
{
103-
$commandName = $message->getProperty(Config::PARAMETER_COMMAND_NAME);
104-
if (false == $commandName) {
105-
return Result::reject(sprintf(
106-
'Got message without required parameter: "%s"',
107-
Config::PARAMETER_COMMAND_NAME
108-
));
65+
$route = $this->routeCollection->commandRoute($command);
66+
if (false == $route) {
67+
throw new \LogicException(sprintf('The command "%s" processor not found', $command));
10968
}
11069

111-
if (isset($this->commandRoutes[$commandName])) {
112-
$processorMessage = clone $message;
113-
$processorMessage->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $this->commandRoutes[$commandName]);
114-
$processorMessage->setProperty(Config::PARAMETER_PROCESSOR_NAME, $commandName);
70+
$processorMessage = clone $message;
71+
$processorMessage->setProperty(Config::PARAMETER_PROCESSOR_NAME, $route->getProcessor());
72+
$processorMessage->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $route->getQueue());
11573

116-
$this->driver->sendToProcessor($this->driver->createClientMessage($processorMessage));
117-
}
74+
$this->driver->sendToProcessor($this->driver->createClientMessage($processorMessage));
11875

119-
return self::ACK;
76+
return Result::ack('Routed to the command processor');
12077
}
12178
}

pkg/enqueue/Client/TopicSubscriberInterface.php

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,22 @@ interface TopicSubscriberInterface
77
/**
88
* The result maybe either:.
99
*
10-
* ['aTopicName']
10+
* 'aTopicName'
1111
*
1212
* or
1313
*
14-
* ['aTopicName' => [
15-
* 'processorName' => 'processor',
16-
* 'queueName' => 'a_client_queue_name',
17-
* 'queueNameHardcoded' => true,
18-
* ]]
14+
* ['aTopicName', 'anotherTopicName']
15+
*
16+
* or
1917
*
20-
* processorName, queueName and queueNameHardcoded are optional.
18+
* ['aTopicName' => [
19+
* 'processor' => 'processor',
20+
* 'queue' => 'a_client_queue_name',
21+
* ]]
2122
*
2223
* Note: If you set queueNameHardcoded to true then the queueName is used as is and therefor the driver is not used to create a transport queue name.
2324
*
24-
* @return array
25+
* @return string|array
2526
*/
2627
public static function getSubscribedTopics();
2728
}

0 commit comments

Comments
 (0)