8000 blocking messenger consumer · symfony/symfony@b1f78e6 · GitHub
[go: up one dir, main page]

Skip to content

Commit b1f78e6

Browse files
committed
blocking messenger consumer
1 parent 56cb40f commit b1f78e6

File tree

7 files changed

+148
-18
lines changed

7 files changed

+148
-18
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/ 8000 Transport/AmqpReceiver.php

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
use Symfony\Component\Messenger\Exception\LogicException;
1616
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
1717
use Symfony\Component\Messenger\Exception\TransportException;
18-
use Symfony\Component\Messenger\Transport\Receiver\BlockingReceiverInterface;
1918
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
19+
use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface;
2020
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
2121
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2222
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -26,7 +26,7 @@
2626
*
2727
* @author Samuel Roze <samuel.roze@gmail.com>
2828
*/
29-
class AmqpReceiver implements QueueReceiverInterface, BlockingReceiverInterface, MessageCountAwareInterface
29+
class AmqpReceiver implements QueueReceiverInterface, QueueBlockingReceiverInterface, MessageCountAwareInterface
3030
{
3131
private SerializerInterface $serializer;
3232
private Connection $connection;
@@ -47,12 +47,18 @@ public function get(): iterable
4747
*/
4848
public function pull(callable $callback): void
4949
{
50-
if (0 === count($this->connection->getQueueNames())) {
50+
$this->pullFromQueues($this->connection->getQueueNames(), $callback);
51+
}
52+
53+
/**
54+
* {@inheritdoc}
55+
*/
56+
public function pullFromQueues(array $queueNames, callable $callback): void
57+
{
58+
if (0 === count($queueNames)) {
5159
return;
5260
}
5361

54-
$queueNames = $this->connection->getQueueNames();
55-
5662
// Pop last queue to send callback
5763
$firstQueue = array_pop($queueNames);
5864

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpTransport.php

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,8 @@
1212
namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
1313

1414
use Symfony\Component\Messenger\Envelope;
15-
use Symfony\Component\Messenger\Exception\TransportException;
16-
use Symfony\Component\Messenger\Tran 9E88 sport\Receiver\BlockingReceiverInterface;
1715
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
16+
use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface;
1817
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
1918
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2019
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -24,7 +23,7 @@
2423
/**
2524
* @author Nicolas Grekas <p@tchwork.com>
2625
*/
27-
class AmqpTransport implements QueueReceiverInterface, BlockingReceiverInterface, TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
26+
class AmqpTransport implements QueueReceiverInterface, QueueBlockingReceiverInterface, TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
2827
{
2928
private SerializerInterface $serializer;
3029
private Connection $connection;
@@ -50,6 +49,14 @@ public function pull(callable $callback): void
5049
$this->getReceiver()->pull($callback);
5150
}
5251

52+
/**
53+
* {@inheritdoc}
54+
*/
55+
public function pullFromQueues(array $queueNames, callable $callback): void
56+
{
57+
$this->receiver->pullFromQueues($queueNames, $callback);
58+
}
59+
5360
/**
5461
* {@inheritdoc}
5562
*/

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ protected function configure(): void
124124
125125
Use the --blocking-mode option to force receiver to work in blocking mode
126126
("consume" method will be used instead of "get" in RabbitMQ for example).
127-
--queues option will be ignored.
128127
Only supported by some receivers, and you should pass only one receiver:
129128
130129
<info>php %command.full_name% <receiver-name> --blocking-mode</info>

src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
use Symfony\Component\Messenger\RoutableMessageBus;
2929
use Symfony\Component\Messenger\Stamp\BusNameStamp;
3030
use Symfony\Component\Messenger\Tests\ResettableDummyReceiver;
31-
use Symfony\Component\Messenger\Transport\Receiver\BlockingReceiverInterface;
31+
use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface;
3232
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
3333

3434
class ConsumeMessagesCommandTest extends TestCase
@@ -77,8 +77,8 @@ public function testRunWithBlockingModeOption()
7777
{
7878
$envelope = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
7979

80-
$receiver = $this->createMock(BlockingReceiverInterface::class);
81-
$receiver->expects($this->once())->method('pull')->willReturnCallback(function (callable $callback) use ($envelope) {
80+
$receiver = $this->createMock(QueueBlockingReceiverInterface::class);
81+
$receiver->expects($this->once())->method('pullFromQueues')->willReturnCallback(function (array $queueNames, callable $callback) use ($envelope) {
8282
call_user_func($callback, $envelope);
8383
});
8484

@@ -102,6 +102,7 @@ public function testRunWithBlockingModeOption()
102102
'receivers' => ['dummy-receiver'],
103103
'--limit' => 1,
104104
'--blocking-mode' => true,
105+
'--queues' => ['foo'],
105106
]);
106107

107108
$tester->assertCommandIsSuccessful();

src/Symfony/Component/Messenger/Tests/WorkerTest.php

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
use Symfony\Component\Messenger\Stamp\StampInterface;
4444
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
4545
use Symfony\Component\Messenger\Transport\Receiver\BlockingReceiverInterface;
46+
use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface;
4647
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
4748
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
4849
use Symfony\Component\Messenger\Worker;
@@ -653,6 +654,72 @@ public function testMoreThanOneReceiverInBlockingMode()
653654
$this->expectExceptionMessage('In blocking mode only one receiver is supported');
654655
$worker->run(['blocking-mode' => true]);
655656
}
657+
658+
public function testWorkerLimitQueuesInBlockingMode()
659+
{
660+
$apiMessage = new DummyMessage('API');
661+
$ipaMessage = new DummyMessage('IPA');
662+
663+
$receiver = new QueueBlockingDummyReceiver([
664+
[new Envelope($apiMessage), new Envelope($ipaMessage)],
665+
]);
666+
667+
$bus = $this->createMock(MessageBusInterface::class);
668+
$envelopes = [];
669+
670+
$bus->expects($this->exactly(2))
671+
->method('dispatch')
672+
->willReturnCallback(function ($envelope) use (&$envelopes) {
673+
return $envelopes[] = $envelope;
674+
});
675+
676+
$dispatcher = new class() implements EventDispatcherInterface {
677+
private StopWorkerOnMessageLimitListener $listener;
678+
679+
public function __construct()
680+
{
681+
$this->listener = new StopWorkerOnMessageLimitListener(2);
682+
}
683+
684+
public function dispatch(object $event): object
685+
{
686+
if ($event instanceof WorkerRunningEvent) {
687+
$this->listener->onWorkerRunning($event);
688+
}
689+
690+
return $event;
691+
}
692+
};
693+
694+
$worker = new Worker(['transport' => $receiver], $bus, $dispatcher);
695+
$worker->run([
696+
'blocking-mode' => true,
697+
'queues' => ['foo']
< 1241 div aria-hidden="true" class="position-absolute top-0 d-flex user-select-none DiffLineTableCellParts-module__comment-indicator--eI0hb">
698+
]);
699+
700+
$this->assertSame($apiMessage, $envelopes[0]->getMessage());
701+
$this->assertSame($ipaMessage, $envelopes[1]->getMessage());
702+
$this->assertCount(1, $envelopes[0]->all(ReceivedStamp::class));
703+
$this->assertCount(1, $envelopes[0]->all(ConsumedByWorkerStamp::class));
704+
$this->assertSame('transport', $envelopes[0]->last(ReceivedStamp::class)->getTransportName());
705+
706+
$this->assertSame(2, $receiver->getAcknowledgeCount());
707+
}
708+
709+
public function testWorkerLimitQueuesUnsupportedInBlockingMode()
710+
{
711+
$receiver = $this->createMock(BlockingReceiverInterface::class);
712+
713+
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
714+
715+
$worker = new Worker(['transport' => $receiver], $bus);
716+
$this->expectException(RuntimeException::class);
717+
$this->expectExceptionMessage(sprintf('Receiver for "transport" does not implement "%s".', QueueBlockingReceiverInterface::class));
718+
$worker->run([
719+
'queues' => ['foo'],
720+
'blocking-mode' => true,
721+
]);
722+
}
656723
}
657724

658725
class DummyReceiver implements ReceiverInterface
@@ -722,6 +789,14 @@ public function pull(callable $callback): void
722789
}
723790
}
724791

792+
class QueueBlockingDummyReceiver extends BlockingDummyReceiver implements QueueBlockingReceiverInterface
793+
{
794+
public function pullFromQueues(array $queueNames, callable $callback): void
795+
{
796+
$this->pull($callback);
797+
}
798+
}
799+
725800
class DummyQueueReceiver extends DummyReceiver implements QueueReceiverInterface
726801
{
727802
public function getFromQueues(array $queueNames): iterable
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport\Receiver;
13+
14+
/**
15+
* Some transports may have multiple queues. This interface is used to read from only some queues in blocking mode.
16+
*
17+
* @author Alexander Melikhov <amelihovv@ya.ru>
18+
*/
19+
interface QueueBlockingReceiverInterface extends BlockingReceiverInterface
20+
{
21+
/**
22+
* Pull messages from the specified queue names instead of consuming from all queues.
23+
*
24+
* @param string[] $queueNames
25+
* @param callable(\AMQPEnvelope):?false $callback If callback return false, then processing thread will be
26+
* returned to PHP script.
27+
*/
28+
public function pullFromQueues(array $queueNames, callable $callback): void;
29+
}

src/Symfony/Component/Messenger/Worker.php

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
use Symfony\Component\Messenger\Stamp\NoAutoAckStamp;
3232
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
3333
use Symfony\Component\Messenger\Transport\Receiver\BlockingReceiverInterface;
34+
use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface;
3435
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
3536
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
3637
use Symfony\Component\RateLimiter\LimiterInterface;
@@ -99,8 +100,14 @@ public function run(array $options = []): void
99100
if ($queueNames) {
100101
// if queue names are specified, all receivers must implement the QueueReceiverInterface
101102
foreach ($this->receivers as $transportName => $receiver) {
102-
if (!$receiver instanceof QueueReceiverInterface) {
103-
throw new RuntimeException(sprintf('Receiver for "%s" does not implement "%s".', $transportName, QueueReceiverInterface::class));
103+
if ($blockingMode) {
104+
if (!$receiver instanceof QueueBlockingReceiverInterface) {
105+
throw new RuntimeException(sprintf('Receiver for "%s" does not implement "%s".', $transportName, QueueBlockingReceiverInterface::class));
106+
}
107+
} else {
108+
if (!$receiver instanceof QueueReceiverInterface) {
109+
throw new RuntimeException(sprintf('Receiver for "%s" does not implement "%s".', $transportName, QueueReceiverInterface::class));
110+
}
104111
}
105112
}
106113
}
@@ -110,9 +117,7 @@ public function run(array $options = []): void
110117
$envelopeHandledStart = $this->clock->now();
111118
foreach ($this->receivers as $transportName => $receiver) {
112119
if ($blockingMode) {
113-
/** @var BlockingReceiverInterface $receiver */
114-
115-
$receiver->pull(function (Envelope $envelope) use ($transportName, &$envelopeHandled) {
120+
$callback = function (Envelope $envelope) use ($transportName, &$envelopeHandled) {
116121
$envelopeHandled = true;
117122

118123
$this->rateLimit($transportName);
@@ -122,7 +127,15 @@ public function run(array $options = []): void
122127
if ($this->shouldStop) {
123128
return false;
124129
}
125-
});
130+
};
131+
132+
if ($queueNames) {
133+
/** @var QueueBlockingReceiverInterface $receiver */
134+
$receiver->pullFromQueues($queueNames, $callback);
135+
} else {
136+
/** @var BlockingReceiverInterface $receiver */
137+
$receiver->pull($callback);
138+
}
126139
} else {
127140
if ($queueNames) {
128141
$envelopes = $receiver->getFromQueues($queueNames);

0 commit comments

Comments
 (0)
0