Skip to content

Commit 9d9f16e

Browse files
committed
Runnable queue
1 parent 6e73d64 commit 9d9f16e

7 files changed

Lines changed: 352 additions & 0 deletions

File tree

src/Process/Runnable/Runnable.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<?php declare(strict_types = 1);
2+
3+
namespace PHPStan\Process\Runnable;
4+
5+
use React\Promise\PromiseInterface;
6+
7+
interface Runnable
8+
{
9+
10+
public function getName(): string;
11+
12+
public function run(): PromiseInterface;
13+
14+
public function cancel(): void;
15+
16+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<?php declare(strict_types = 1);
2+
3+
namespace PHPStan\Process\Runnable;
4+
5+
class RunnableCanceledException extends \Exception
6+
{
7+
8+
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
<?php declare(strict_types = 1);
2+
3+
namespace PHPStan\Process\Runnable;
4+
5+
use React\Promise\CancellablePromiseInterface;
6+
use React\Promise\Deferred;
7+
use SplObjectStorage;
8+
9+
class RunnableQueue
10+
{
11+
12+
private RunnableQueueLogger $logger;
13+
14+
private int $maxSize;
15+
16+
/** @var array<array{Runnable, int, Deferred}> */
17+
private array $queue = [];
18+
19+
/** @var SplObjectStorage<Runnable, array{int, Deferred}> */
20+
private SplObjectStorage $running;
21+
22+
public function __construct(RunnableQueueLogger $logger, int $maxSize)
23+
{
24+
$this->logger = $logger;
25+
$this->maxSize = $maxSize;
26+
27+
/** @var SplObjectStorage<Runnable, array{int, Deferred}> $running */
28+
$running = new SplObjectStorage();
29+
$this->running = $running;
30+
}
31+
32+
public function getQueueSize(): int
33+
{
34+
$allSize = 0;
35+
foreach ($this->queue as [$runnable, $size, $deferred]) {
36+
$allSize += $size;
37+
}
38+
39+
return $allSize;
40+
}
41+
42+
public function getRunningSize(): int
43+
{
44+
$allSize = 0;
45+
foreach ($this->running as $running) { // phpcs:ignore
46+
[$size] = $this->running->getInfo();
47+
$allSize += $size;
48+
}
49+
50+
return $allSize;
51+
}
52+
53+
public function queue(Runnable $runnable, int $size): CancellablePromiseInterface
54+
{
55+
if ($size > $this->maxSize) {
56+
throw new \PHPStan\ShouldNotHappenException('Runnable size exceeds queue maxSize.');
57+
}
58+
59+
$deferred = new Deferred(static function () use ($runnable): void {
60+
$runnable->cancel();
61+
});
62+
$this->queue[] = [$runnable, $size, $deferred];
63+
$this->drainQueue();
64+
65+
/** @var CancellablePromiseInterface */
66+
return $deferred->promise();
67+
}
68+
69+
private function drainQueue(): void
70+
{
71+
if (count($this->queue) === 0) {
72+
$this->logger->log('Queue empty');
73+
return;
74+
}
75+
76+
$currentQueueSize = $this->getRunningSize();
77+
if ($currentQueueSize > $this->maxSize) {
78+
throw new \PHPStan\ShouldNotHappenException('Running overflow');
79+
}
80+
81+
if ($currentQueueSize === $this->maxSize) {
82+
$this->logger->log('Queue is full');
83+
return;
84+
}
85+
86+
$this->logger->log('Queue not full - looking at first item in the queue');
87+
88+
[$runnable, $runnableSize, $deferred] = $this->queue[0];
89+
90+
$newSize = $currentQueueSize + $runnableSize;
91+
if ($newSize > $this->maxSize) {
92+
$this->logger->log(
93+
sprintf(
94+
'Canot remote first item from the queue - it has size %d, current queue size is %d, new size would be %d',
95+
$runnableSize,
96+
$currentQueueSize,
97+
$newSize
98+
)
99+
);
100+
return;
101+
}
102+
103+
$this->logger->log(sprintf('Removing top item from queue - new size is %d', $newSize));
104+
105+
/** @var array{Runnable, int, Deferred} $popped */
106+
$popped = array_shift($this->queue);
107+
if ($popped[0] !== $runnable || $popped[1] !== $runnableSize || $popped[2] !== $deferred) {
108+
throw new \PHPStan\ShouldNotHappenException();
109+
}
110+
111+
$this->running->attach($runnable, [$runnableSize, $deferred]);
112+
$this->logger->log(sprintf('Running process %s', $runnable->getName()));
113+
$runnable->run()->then(function ($value) use ($runnable, $deferred): void {
114+
$this->logger->log(sprintf('Process %s finished successfully', $runnable->getName()));
115+
$deferred->resolve($value);
116+
$this->running->detach($runnable);
117+
$this->drainQueue();
118+
}, function (\Throwable $e) use ($runnable, $deferred): void {
119+
$this->logger->log(sprintf('Process %s finished unsuccessfully: %s', $runnable->getName(), $e->getMessage()));
120+
$deferred->reject($e);
121+
$this->running->detach($runnable);
122+
$this->drainQueue();
123+
});
124+
}
125+
126+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<?php declare(strict_types = 1);
2+
3+
namespace PHPStan\Process\Runnable;
4+
5+
interface RunnableQueueLogger
6+
{
7+
8+
public function log(string $message): void;
9+
10+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?php declare(strict_types = 1);
2+
3+
namespace PHPStan\Process\Runnable;
4+
5+
class RunnableQueueLoggerStub implements RunnableQueueLogger
6+
{
7+
8+
/** @var string[] */
9+
private $messages = [];
10+
11+
/**
12+
* @return string[]
13+
*/
14+
public function getMessages(): array
15+
{
16+
return $this->messages;
17+
}
18+
19+
public function log(string $message): void
20+
{
21+
$this->messages[] = $message;
22+
}
23+
24+
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
<?php declare(strict_types = 1);
2+
3+
namespace PHPStan\Process\Runnable;
4+
5+
use PHPUnit\Framework\TestCase;
6+
7+
class RunnableQueueTest extends TestCase
8+
{
9+
10+
public function testQueuedProcessIsRun(): void
11+
{
12+
$logger = new RunnableQueueLoggerStub();
13+
$queue = new RunnableQueue($logger, 8);
14+
15+
$one = new RunnableStub('1');
16+
$queue->queue($one, 1);
17+
$this->assertSame(0, $queue->getQueueSize());
18+
$this->assertSame(1, $queue->getRunningSize());
19+
$one->finish();
20+
$this->assertSame(0, $queue->getQueueSize());
21+
$this->assertSame(0, $queue->getRunningSize());
22+
$this->assertSame([
23+
'Queue not full - looking at first item in the queue',
24+
'Removing top item from queue - new size is 1',
25+
'Running process 1',
26+
'Process 1 finished successfully',
27+
'Queue empty',
28+
], $logger->getMessages());
29+
}
30+
31+
public function testComplexScenario(): void
32+
{
33+
$logger = new RunnableQueueLoggerStub();
34+
$queue = new RunnableQueue($logger, 8);
35+
36+
$one = new RunnableStub('1');
37+
$two = new RunnableStub('2');
38+
$three = new RunnableStub('3');
39+
$four = new RunnableStub('4');
40+
$queue->queue($one, 4);
41+
$this->assertSame(0, $queue->getQueueSize());
42+
$this->assertSame(4, $queue->getRunningSize());
43+
44+
$queue->queue($two, 2);
45+
$this->assertSame(0, $queue->getQueueSize());
46+
$this->assertSame(6, $queue->getRunningSize());
47+
$queue->queue($three, 3);
48+
$this->assertSame(3, $queue->getQueueSize());
49+
$this->assertSame(6, $queue->getRunningSize());
50+
$queue->queue($four, 4);
51+
$this->assertSame(7, $queue->getQueueSize());
52+
$this->assertSame(6, $queue->getRunningSize());
53+
54+
$one->finish();
55+
$this->assertSame(4, $queue->getQueueSize());
56+
$this->assertSame(5, $queue->getRunningSize());
57+
58+
$two->finish();
59+
$this->assertSame(0, $queue->getQueueSize());
60+
$this->assertSame(7, $queue->getRunningSize());
61+
62+
$three->finish();
63+
$this->assertSame(0, $queue->getQueueSize());
64+
$this->assertSame(4, $queue->getRunningSize());
65+
66+
$four->finish();
67+
$this->assertSame(0, $queue->getQueueSize());
68+
$this->assertSame(0, $queue->getRunningSize());
69+
70+
$this->assertSame([
71+
0 => 'Queue not full - looking at first item in the queue',
72+
1 => 'Removing top item from queue - new size is 4',
73+
2 => 'Running process 1',
74+
3 => 'Queue not full - looking at first item in the queue',
75+
4 => 'Removing top item from queue - new size is 6',
76+
5 => 'Running process 2',
77+
6 => 'Queue not full - looking at first item in the queue',
78+
7 => 'Canot remote first item from the queue - it has size 3, current queue size is 6, new size would be 9',
79+
8 => 'Queue not full - looking at first item in the queue',
80+
9 => 'Canot remote first item from the queue - it has size 3, current queue size is 6, new size would be 9',
81+
10 => 'Process 1 finished successfully',
82+
11 => 'Queue not full - looking at first item in the queue',
83+
12 => 'Removing top item from queue - new size is 5',
84+
13 => 'Running process 3',
85+
14 => 'Process 2 finished successfully',
86+
15 => 'Queue not full - looking at first item in the queue',
87+
16 => 'Removing top item from queue - new size is 7',
88+
17 => 'Running process 4',
89+
18 => 'Process 3 finished successfully',
90+
19 => 'Queue empty',
91+
20 => 'Process 4 finished successfully',
92+
21 => 'Queue empty',
93+
], $logger->getMessages());
94+
}
95+
96+
public function testCancel(): void
97+
{
98+
$logger = new RunnableQueueLoggerStub();
99+
$queue = new RunnableQueue($logger, 8);
100+
$one = new RunnableStub('1');
101+
$promise = $queue->queue($one, 4);
102+
$this->assertSame(0, $queue->getQueueSize());
103+
$this->assertSame(4, $queue->getRunningSize());
104+
105+
$promise->then(static function () use ($logger): void {
106+
$logger->log('Should not happen');
107+
}, static function (\Exception $e) use ($logger): void {
108+
$logger->log(sprintf('Else callback in test called: %s', $e->getMessage()));
109+
});
110+
$promise->cancel();
111+
112+
$this->assertSame(0, $queue->getQueueSize());
113+
$this->assertSame(0, $queue->getRunningSize());
114+
115+
$this->assertSame([
116+
0 => 'Queue not full - looking at first item in the queue',
117+
1 => 'Removing top item from queue - new size is 4',
118+
2 => 'Running process 1',
119+
3 => 'Process 1 finished unsuccessfully: Runnable 1 canceled',
120+
4 => 'Else callback in test called: Runnable 1 canceled',
121+
5 => 'Queue empty',
122+
], $logger->getMessages());
123+
}
124+
125+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
<?php declare(strict_types = 1);
2+
3+
namespace PHPStan\Process\Runnable;
4+
5+
use React\Promise\Deferred;
6+
use React\Promise\PromiseInterface;
7+
8+
class RunnableStub implements Runnable
9+
{
10+
11+
/** @var string */
12+
private $name;
13+
14+
/** @var Deferred */
15+
private $deferred;
16+
17+
public function __construct(string $name)
18+
{
19+
$this->name = $name;
20+
$this->deferred = new Deferred();
21+
}
22+
23+
public function getName(): string
24+
{
25+
return $this->name;
26+
}
27+
28+
public function finish(): void
29+
{
30+
$this->deferred->resolve();
31+
}
32+
33+
public function run(): PromiseInterface
34+
{
35+
return $this->deferred->promise();
36+
}
37+
38+
public function cancel(): void
39+
{
40+
$this->deferred->reject(new \PHPStan\Process\Runnable\RunnableCanceledException(sprintf('Runnable %s canceled', $this->getName())));
41+
}
42+
43+
}

0 commit comments

Comments
 (0)