8000 Merge pull request #23 from sylfabre/patch-2-consumer-polling · queue-interop/queue-interop@eefee25 · GitHub
[go: up one dir, main page]

Skip to content

Commit eefee25

Browse files
authored
Merge pull request #23 from sylfabre/patch-2-consumer-polling
Consumer polling
2 parents f01c5fb + e6cee0d commit eefee25

File tree

3 files changed

+147
-1
lines changed

3 files changed

+147
-1
lines changed

src/Impl/ConsumerPollingTrait.php

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace Interop\Queue\Impl;
5+
6+
use Interop\Queue\Message;
7+
8+
/**
9+
* The ConsumerPollingTrait is a common implementation of the polling algorithm for a Consumer
10+
*/
11+
trait ConsumerPollingTrait
12+
{
13+
/**
14+
* Polling interval in milliseconds
15+
* @var integer
16+
*/
17+
protected $pollingInterval = 1000;
18+
19+
/**
20+
* Set polling interval in milliseconds.
21+
*/
22+
public function setPollingInterval(int $msec): self
23+
{
24+
$this->pollingInterval = $msec;
25+
return $this;
26+
}
27+
28+
/**
29+
* Get polling interval in milliseconds.
30+
*/
31+
public function getPollingInterval(): int
32+
{
33+
return $this->pollingInterval;
34+
}
35+
36+
public function receive(int $timeout = 0): ?Message
37+
{
38+
$timeout *= 1000; // from milliseconds to microseconds
39+
$startAt = microtime(true);
40+
41+
while(true) {
42+
$message = $this->receiveNoWait();
43+
44+
if($message) {
45+
return $message;
46+
}
47+
48+
if($timeout) {
49+
50+
$timeSpent = microtime(true) - $startAt;
51+
$timeSpent *= 1000000; // from seconds to microseconds
52+
$timeLeft = $timeout - $timeSpent;
53+
54+
// No time left to wait
55+
if($timeLeft <= 0) {
56+
return null;
57+
}
58+
59+
// We pay attention not to wait too long to go over the timeout limit
60+
$sleep = min($timeLeft, $this->pollingInterval * 1000);
61+
62+
} else {
63+
$sleep = $this->pollingInterval * 1000;
64+
}
65+
66+
usleep(intval($sleep));
67+
}
68+
69+
return null;
70+
}
71+
72+
public abstract function receiveNoWait(): ?Message;
73+
}

tests/Impl/ConsumerPollingTest.php

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
<?php
2+
namespace Interop\Queue\Tests\Impl;
3+
4+
use Interop\Queue\Impl\ConsumerPollingTrait;
5+
use Interop\Queue\Message;
6+
use PHPUnit\Framework\TestCase;
7+
8+
class ConsumerPollingTest extends TestCase
9+
{
10+
11+
public function getConsumer(callable $receiveNoWait)
12+
{
13+
return new class($receiveNoWait)
14+
{
15+
use ConsumerPollingTrait;
16+
17+
private $count = 0;
18+
private $callable;
19+
20+
public function __construct(callable $receiveNoWait)
21+
{
22+
$this->callable = $receiveNoWait;
23+
}
24+
25+
public function receiveNoWait(): ?Message
26+
{
27+
$callable = $this->callable;
28+
return $callable($this->count);
29+
}
30+
};
31+
}
32+
33+
public function testShouldTimeout()
34+
{
35+
$consumer = $this->getConsumer(function() {
36+
return null;
37+
});
38+
39+
$start = microtime(true);
40+
41+
$consumer->receive(250);
42+
43+
$timer = microtime(true) - $start;
44+
45+
$this->assertGreaterThan(0.250, $timer);
46+
$this->assertLessThan(0.255, $timer);
47+
}
48+
49+
public function testShouldCallThreeTimes()
50+
{
51+
$message = $this->getMockForAbstractClass(Message::class);
52+
53+
$consumer = $this->getConsumer(function(int &$count) use($message) {
54+
$count++;
55+
if(3 === $count) {
56+
return $message;
57+
}
58+
return null;
59+
});
60+
$consumer->setPollingInterval(100);
61+
62+
$start = microtime(true);
63+
64+
$this->assertSame($message, $consumer->receive(250));
65+
66+
$timer = microtime(true) - $start;
67+
68+
$this->assertGreaterThan(0.200, $timer);
69+
$this->assertLessThan(0.205, $timer);
70+
}
71+
}

tests/Spec/MessageTest.php renamed to tests/Impl/MessageTest.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
<?php
2-
namespace Interop\Queue\Tests\Spec;
2+
declare(strict_types=1);
3+
4+
namespace Interop\Queue\Tests\Impl;
35

46
use Interop\Queue\Message;
57
use Interop\Queue\Spec\MessageSpec;

0 commit comments

Comments
 (0)
0