Skip to content

Commit f2a67e8

Browse files
committed
[client] fixes and cleanups.
1 parent 26a91d8 commit f2a67e8

File tree

7 files changed

+376
-61
lines changed

7 files changed

+376
-61
lines changed

docs/bundle/job_queue.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class Step1Processor implements PsrProcessor
8989
$runner->createDelayed(
9090
$jobName,
9191
function (JobRunner $runner, Job $childJob) use ($entity) {
92-
$this->producer->sendCommand('search:index:index-single-entity', [
92+
$this->producer->sendEvent('search:index:index-single-entity', [
9393
'entityId' => $entity->getId(),
9494
'jobId' => $childJob->getId(),
9595
]);

docs/bundle/message_producer.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ where another one (it is called spool producer) collects them in memory and send
2626

2727
The producer has two types on send methods:
2828

29-
* `sendEvent` - Message is sent to topic and many consumers can subscriber to it. It is "fire and forget" strategy.
30-
* `sendCommand` - Message is to ONE exact consumer. It could be used as "fire and forget" or as RPC.
29+
* `sendEvent` - Message is sent to topic and many consumers can subscriber to it. It is "fire and forget" strategy. The even could be sent to "message bus" to other applications.
30+
* `sendCommand` - Message is to ONE exact consumer. It could be used as "fire and forget" or as RPC. The command message is always sent in scope of current application.
3131

3232
### Send event
3333

docs/client/message_examples.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,28 @@
11
# Client. Message examples
2+
3+
* [Scope](#scope)
4+
* [Delay](#delay)
5+
* [Expiration (TTL)](#expiration-ttl)
6+
* [Priority](#priority)
7+
* [Timestamp, Content type, Message id](#timestamp-content-type-message-id)
8+
9+
## Scope
10+
11+
There are two two types possible scopes: `Message:SCOPE_MESSAGE_BUS` and `Message::SCOPE_APP`.
12+
The first one instructs the client send messages (if driver supports) to the message bus so other apps can consume those messages.
13+
The second in turns limits the message to the application that sent it. No other apps could receive it.
14+
15+
```php
16+
<?php
17+
18+
use Enqueue\Client\Message;
19+
20+
$message = new Message();
21+
$message->setScope(Message::SCOPE_MESSAGE_BUS);
22+
23+
/** @var \Enqueue\Client\ProducerInterface $producer */
24+
$producer->sendEvent('aTopic', $message);
25+
```
226

327
## Delay
428

docs/quick_tour.md

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ It provides easy to use services for producing and processing messages.
163163
It supports unified format for setting message expiration, delay, timestamp, correlation id.
164164
It supports message bus so different applications can talk to each other.
165165

166-
Here's an example of how you can send and consume messages.
166+
Here's an example of how you can send and consume event messages.
167167

168168
```php
169169
<?php
@@ -179,15 +179,57 @@ $client = new SimpleClient('file://foo/bar');
179179
$client->setupBroker();
180180

181181
$client->bind('a_foo_topic', 'fooProcessor', function(PsrMessage $message) {
182-
// your processing logic here
182+
// your event processor logic here
183183
});
184184

185-
$client->send('a_bar_topic', 'aMessageData');
186-
187-
// in another process you can consume messages.
185+
// this is a blocking call, it'll consume message until it is interrupted
188186
$client->consume();
189187
```
190188

189+
and command messages:
190+
191+
```php
192+
<?php
193+
use Enqueue\SimpleClient\SimpleClient;
194+
use Enqueue\Psr\PsrMessage;
195+
use Enqueue\Psr\PsrContext;
196+
use Enqueue\Client\Config;
197+
use Enqueue\Consumption\Extension\ReplyExtension;
198+
use Enqueue\Consumption\Result;
199+
200+
// composer require enqueue/amqp-ext
201+
$client = new SimpleClient('amqp://');
202+
203+
// composer require enqueue/fs
204+
$client = new SimpleClient('file://foo/bar');
205+
$client->
206+
207+
$client->setupBroker();
208+
209+
$client->bind(Config::COMMAND_TOPIC, 'bar_command', function(PsrMessage $message) {
210+
// your bar command processor logic here
211+
});
212+
213+
$client->bind(Config::COMMAND_TOPIC, 'baz_reply_command', function(PsrMessage $message, PsrContext $context) {
214+
// your baz reply command processor logic here
215+
216+
return Result::reply($context->createMessage('theReplyBody'));
217+
});
218+
219+
// It is sent to one consumer.
220+
$client->sendCommand('bar_command', 'aMessageData');
221+
222+
// It is possible to get reply
223+
$promise = $client->sendCommand('bar_command', 'aMessageData', true);
224+
225+
// you can send several commands and only after start getting replies.
226+
227+
$replyMessage = $promise->receive(2000); // 2 sec
228+
229+
// this is a blocking call, it'll consume message until it is interrupted
230+
$client->consume([new ReplyExtension()]);
231+
```
232+
191233
## Cli commands
192234

193235
The library provides handy commands out of the box.

pkg/enqueue/Tests/Client/SpoolProducerTest.php

Lines changed: 88 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Enqueue\Client\Message;
66
use Enqueue\Client\ProducerInterface;
77
use Enqueue\Client\SpoolProducer;
8+
use Enqueue\Rpc\Promise;
89
use Enqueue\Test\ClassExtensionTrait;
910
use PHPUnit\Framework\TestCase;
1011

@@ -22,7 +23,7 @@ public function testCouldBeConstructedWithRealProducer()
2223
new SpoolProducer($this->createProducerMock());
2324
}
2425

25-
public function testShouldQueueMessageOnSend()
26+
public function testShouldQueueEventMessageOnSend()
2627
{
2728
$message = new Message();
2829

@@ -31,21 +32,44 @@ public function testShouldQueueMessageOnSend()
3132
->expects($this->never())
3233
->method('sendEvent')
3334
;
35+
$realProducer
36+
->expects($this->never())
37+
->method('sendCommand')
38+
;
3439

3540
$producer = new SpoolProducer($realProducer);
3641
$producer->sendEvent('foo_topic', $message);
3742
$producer->sendEvent('bar_topic', $message);
3843
}
3944

40-
public function testShouldSendQueuedMessagesOnFlush()
45+
public function testShouldQueueCommandMessageOnSend()
46+
{
47+
$message = new Message();
48+
49+
$realProducer = $this->createProducerMock();
50+
$realProducer
51+
->expects($this->never())
52+
->method('sendEvent')
53+
;
54+
$realProducer
55+
->expects($this->never())
56+
->method('sendCommand')
57+
;
58+
59+
$producer = new SpoolProducer($realProducer);
60+
$producer->sendCommand('foo_command', $message);
61+
$producer->sendCommand('bar_command', $message);
62+
}
63+
64+
public function testShouldSendQueuedEventMessagesOnFlush()
4165
{
4266
$message = new Message();
4367
$message->setScope('third');
4468

4569
$realProducer = $this->createProducerMock();
4670
$realProducer
4771
->expects($this->at(0))
48-
->method('sendEVent')
72+
->method('sendEvent')
4973
->with('foo_topic', 'first')
5074
;
5175
$realProducer
@@ -58,6 +82,10 @@ public function testShouldSendQueuedMessagesOnFlush()
5882
->method('sendEvent')
5983
->with('baz_topic', $this->identicalTo($message))
6084
;
85+
$realProducer
86+
->expects($this->never())
87+
->method('sendCommand')
88+
;
6189

6290
$producer = new SpoolProducer($realProducer);
6391

@@ -68,6 +96,63 @@ public function testShouldSendQueuedMessagesOnFlush()
6896
$producer->flush();
6997
}
7098

99+
public function testShouldSendQueuedCommandMessagesOnFlush()
100+
{
101+
$message = new Message();
102+
$message->setScope('third');
103+
104+
$realProducer = $this->createProducerMock();
105+
$realProducer
106+
->expects($this->at(0))
107+
->method('sendCommand')
108+
->with('foo_command', 'first')
109+
;
110+
$realProducer
111+
->expects($this->at(1))
112+
->method('sendCommand')
113+
->with('bar_command', ['second'])
114+
;
115+
$realProducer
116+
->expects($this->at(2))
117+
->method('sendCommand')
118+
->with('baz_command', $this->identicalTo($message))
119+
;
120+
121+
$producer = new SpoolProducer($realProducer);
122+
123+
$producer->sendCommand('foo_command', 'first');
124+
$producer->sendCommand('bar_command', ['second']);
125+
$producer->sendCommand('baz_command', $message);
126+
127+
$producer->flush();
128+
}
129+
130+
public function testShouldSendImmediatelyCommandMessageWithNeedReplyTrue()
131+
{
132+
$message = new Message();
133+
$message->setScope('third');
134+
135+
$promise = $this->createMock(Promise::class);
136+
137+
$realProducer = $this->createProducerMock();
138+
$realProducer
139+
->expects($this->never())
140+
->method('sendEvent')
141+
;
142+
$realProducer
143+
->expects($this->once())
144+
->method('sendCommand')
145+
->with('foo_command', 'first')
146+
->willReturn($promise)
147+
;
148+
149+
$producer = new SpoolProducer($realProducer);
150+
151+
$actualPromise = $producer->sendCommand('foo_command', 'first', true);
152+
153+
$this->assertSame($promise, $actualPromise);
154+
}
155+
71156
/**
72157
* @return \PHPUnit_Framework_MockObject_MockObject|ProducerInterface
73158
*/

0 commit comments

Comments
 (0)