Skip to content

Commit 82d2524

Browse files
committed
Add gearman transport.
1 parent ac4b8aa commit 82d2524

27 files changed

+1092
-7
lines changed

bin/test

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ waitForService rabbitmq 5672 50
2424
waitForService mysql 3306 50
2525
waitForService redis 6379 50
2626
waitForService beanstalkd 11300
27+
waitForService gearmand 4730
2728

2829
php pkg/job-queue/Tests/Functional/app/console doctrine:database:create
2930
php pkg/job-queue/Tests/Functional/app/console doctrine:schema:update --force

composer.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
"enqueue/dbal": "*@dev",
1515
"enqueue/sqs": "*@dev",
1616
"enqueue/pheanstalk": "*@dev",
17+
"enqueue/gearman": "*@dev",
1718
"enqueue/enqueue-bundle": "*@dev",
1819
"enqueue/job-queue": "*@dev",
1920
"enqueue/simple-client": "*@dev",
@@ -90,6 +91,10 @@
9091
"type": "path",
9192
"url": "pkg/pheanstalk"
9293
},
94+
{
95+
"type": "path",
96+
"url": "pkg/gearman"
97+
},
9398
{
9499
"type": "path",
95100
"url": "pkg/simple-client"

docker-compose.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ services:
99
- mysql
1010
- redis
1111
- beanstalkd
12+
- gearmand
1213
volumes:
1314
- './:/mqdev'
1415
environment:
@@ -34,6 +35,7 @@ services:
3435
- BEANSTALKD_HOST=beanstalkd
3536
- BEANSTALKD_PORT=11300
3637
- BEANSTALKD_DSN=beanstalk://beanstalkd:11300
38+
- GEARMAN_DSN=gearman://gearmand:4730
3739

3840
rabbitmq:
3941
image: enqueue/rabbitmq:latest
@@ -48,6 +50,9 @@ services:
4850
beanstalkd:
4951
image: 'schickling/beanstalkd'
5052

53+
gearmand:
54+
image: 'artefactual/gearmand'
55+
5156
redis:
5257
image: 'redis:3'
5358
ports:

docker/Dockerfile

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
1-
FROM ubuntu:16.04
1+
FROM formapro/nginx-php-fpm:latest-all-exts
22

33
## libs
44
RUN set -x && \
55
apt-get update && \
6-
apt-get install -y --no-install-recommends wget curl openssl ca-certificates nano netcat && \
7-
apt-get install -y --no-install-recommends php php-mysql php-redis php-curl php-intl php-mbstring php-zip php-mcrypt php-xdebug php-bcmath php-xml php-amqp
6+
apt-get install -y --no-install-recommends wget curl openssl ca-certificates nano netcat
87

98
## confis
109

1110
# RUN rm -f /etc/php/7.0/cli/conf.d/*xdebug.ini
1211

13-
COPY ./php/cli.ini /etc/php/7.0/cli/conf.d/1-dev_cli.ini
12+
COPY ./php/cli.ini /etc/php/7.1/cli/conf.d/1-dev_cli.ini
1413
COPY ./bin/dev_entrypoiny.sh /usr/local/bin/entrypoint.sh
1514
RUN chmod u+x /usr/local/bin/entrypoint.sh
1615

docker/php/cli.ini

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,3 @@ memory_limit = 2G
44
max_execution_time=0
55
date.timezone=UTC
66
variables_order="EGPCS"
7-
8-
extension=amqp.so

phpunit.xml.dist

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@
5353
<directory>pkg/pheanstalk/Tests</directory>
5454
</testsuite>
5555

56+
<testsuite name="gearman transport">
57+
<directory>pkg/gearman/Tests</directory>
58+
</testsuite>
59+
5660
<testsuite name="enqueue-bundle">
5761
<directory>pkg/enqueue-bundle/Tests</directory>
5862
</testsuite>

pkg/enqueue/functions.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
use Enqueue\Consumption\QueueConsumer;
77
use Enqueue\Dbal\DbalConnectionFactory;
88
use Enqueue\Fs\FsConnectionFactory;
9+
use Enqueue\Gearman\GearmanConnectionFactory;
910
use Enqueue\Null\NullConnectionFactory;
11+
use Enqueue\Pheanstalk\PheanstalkConnectionFactory;
1012
use Enqueue\Psr\PsrConnectionFactory;
1113
use Enqueue\Psr\PsrContext;
1214

@@ -48,6 +50,14 @@ function dsn_to_connection_factory($dsn)
4850
$map['pdo_sqlite'] = DbalConnectionFactory::class;
4951
}
5052

53+
if (class_exists(GearmanConnectionFactory::class)) {
54+
$map['gearman'] = GearmanConnectionFactory::class;
55+
}
56+
57+
if (class_exists(PheanstalkConnectionFactory::class)) {
58+
$map['beanstalk'] = PheanstalkConnectionFactory::class;
59+
}
60+
5161
list($scheme) = explode('://', $dsn);
5262
if (false == $scheme || false === strpos($dsn, '://')) {
5363
throw new \LogicException(sprintf('The scheme could not be parsed from DSN "%s"', $dsn));

pkg/gearman/.travis.yml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
sudo: false
2+
3+
git:
4+
depth: 1
5+
6+
language: php
7+
8+
php:
9+
- '5.6'
10+
- '7.0'
11+
12+
cache:
13+
directories:
14+
- $HOME/.composer/cache
15+
16+
install:
17+
- composer self-update
18+
- composer install --prefer-source
19+
20+
script:
21+
- vendor/bin/phpunit --exclude-group=functional
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
<?php
2+
3+
namespace Enqueue\Gearman;
4+
5+
use Enqueue\Psr\PsrConnectionFactory;
6+
7+
class GearmanConnectionFactory implements PsrConnectionFactory
8+
{
9+
/**
10+
* @var array
11+
*/
12+
private $config;
13+
14+
/**
15+
* The config could be an array, string DSN or null. In case of null it will attempt to connect to localhost with default settings.
16+
*
17+
* [
18+
* 'host' => 'localhost',
19+
* 'port' => 11300
20+
* ]
21+
*
22+
* or
23+
*
24+
* gearman://host:port
25+
*
26+
* @param array|string $config
27+
*/
28+
public function __construct($config = 'gearman://')
29+
{
30+
if (empty($config) || 'gearman://' === $config) {
31+
$config = [];
32+
} elseif (is_string($config)) {
33+
$config = $this->parseDsn($config);
34+
} elseif (is_array($config)) {
35+
} else {
36+
throw new \LogicException('The config must be either an array of options, a DSN string or null');
37+
}
38+
39+
$this->config = array_replace($this->defaultConfig(), $config);
40+
}
41+
42+
/**
43+
* {@inheritdoc}
44+
*
45+
* @return GearmanContext
46+
*/
47+
public function createContext()
48+
{
49+
return new GearmanContext($this->config);
50+
}
51+
52+
/**
53+
* @param string $dsn
54+
*
55+
* @return array
56+
*/
57+
private function parseDsn($dsn)
58+
{
59+
$dsnConfig = parse_url($dsn);
60+
if (false === $dsnConfig) {
61+
throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn));
62+
}
63+
64+
$dsnConfig = array_replace([
65+
'scheme' => null,
66+
'host' => null,
67+
'port' => null,
68+
'user' => null,
69+
'pass' => null,
70+
'path' => null,
71+
'query' => null,
72+
], $dsnConfig);
73+
74+
if ('gearman' !== $dsnConfig['scheme']) {
75+
throw new \LogicException(sprintf('The given DSN scheme "%s" is not supported. Could be "gearman" only.', $dsnConfig['scheme']));
76+
}
77+
78+
return [
79+
'port' => $dsnConfig['port'],
80+
'host' => $dsnConfig['host'],
81+
];
82+
}
83+
84+
/**
85+
* @return array
86+
*/
87+
private function defaultConfig()
88+
{
89+
return [
90+
'host' => \GEARMAN_DEFAULT_TCP_HOST,
91+
'port' => \GEARMAN_DEFAULT_TCP_PORT,
92+
];
93+
}
94+
}

pkg/gearman/GearmanConsumer.php

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
<?php
2+
3+
namespace Enqueue\Gearman;
4+
5+
use Enqueue\Psr\PsrConsumer;
6+
use Enqueue\Psr\PsrMessage;
7+
8+
class GearmanConsumer implements PsrConsumer
9+
{
10+
/**
11+
* @var \GearmanWorker
12+
*/
13+
private $worker;
14+
15+
/**
16+
* @var GearmanDestination
17+
*/
18+
private $destination;
19+
20+
/**
21+
* @param \GearmanWorker $worker
22+
* @param GearmanDestination $destination
23+
*/
24+
public function __construct(\GearmanWorker $worker, GearmanDestination $destination)
25+
{
26+
$this->worker = $worker;
27+
$this->destination = $destination;
28+
}
29+
30+
/**
31+
* {@inheritdoc}
32+
*
33+
* @return GearmanDestination
34+
*/
35+
public function getQueue()
36+
{
37+
return $this->destination;
38+
}
39+
40+
/**
41+
* {@inheritdoc}
42+
*
43+
* @return GearmanMessage
44+
*/
45+
public function receive($timeout = 0)
46+
{
47+
set_error_handler(function ($severity, $message, $file, $line) {
48+
throw new \ErrorException($message, 0, $severity, $file, $line);
49+
});
50+
51+
$this->worker->setTimeout($timeout);
52+
53+
try {
54+
$message = null;
55+
56+
$this->worker->addFunction($this->destination->getName(), function (\GearmanJob $job) use (&$message) {
57+
$message = GearmanMessage::jsonUnserialize($job->workload());
58+
});
59+
60+
while ($this->worker->work());
61+
} finally {
62+
restore_error_handler();
63+
}
64+
65+
return $message;
66+
}
67+
68+
/**
69+
* {@inheritdoc}
70+
*/
71+
public function receiveNoWait()
72+
{
73+
return $this->receive(100);
74+
}
75+
76+
/**
77+
* {@inheritdoc}
78+
*/
79+
public function acknowledge(PsrMessage $message)
80+
{
81+
}
82+
83+
/**
84+
* {@inheritdoc}
85+
*/
86+
public function reject(PsrMessage $message, $requeue = false)
87+
{
88+
}
89+
90+
/**
91+
* @return \GearmanWorker
92+
*/
93+
public function getWorker()
94+
{
95+
return $this->worker;
96+
}
97+
}

0 commit comments

Comments
 (0)