8000 blocking messenger consumer · symfony/symfony@4f0368c · GitHub
[go: up one dir, main page]

Skip to content

Commit 4f0368c

Browse files
committed
blocking messenger consumer
1 parent d4496a8 commit 4f0368c

File tree

7 files changed

+148
-18
lines changed

7 files changed

+148
-18
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
use Symfony\Component\Messenger\Exception\LogicException;
1616
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
1717
use Symfony\Component\Messenger\Exception\TransportException;
18-
use Symfony\Component\Messenger\Transport\Receiver\BlockingReceiverInterface;
1918
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
19+
use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface;
2020
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
2121
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2222
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -26,7 +26,7 @@
2626
*
2727
* @author Samuel Roze <samuel.roze@gmail.com>
2828
*/
29-
class AmqpReceiver implements QueueReceiverInterface, BlockingReceiverInterface, MessageCountAwareInterface
29+
class AmqpReceiver implements QueueReceiverInterface, QueueBlockingReceiverInterface, MessageCountAwareInterface
3030
{
3131
private SerializerInterface $serializer;
3232
private Connection $connection;
@@ -50,12 +50,18 @@ public function get(): iterable
5050
*/
5151
public function pull(callable $callback): void
5252
{
53-
if (0 === count($this->connection->getQueueNames())) {
53+
$this->pullFromQueues($this->connection->getQueueNames(), $callback);
54+
}
55+
56+
/**
57+
* {@inheritdoc}
58+
*/
59+
public function pullFromQueues(array $queueNames, callable $callback): void
60+
{
61+
if (0 === count($queueNames)) {
5462
return;
5563
}
5664

57-
$queueNames = $this->connection->getQueueNames();
58-
5965
// Pop last queue to send callback
6066
$firstQueue = array_pop($queueNames);
6167

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpTransport.php

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,8 @@
1212
namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
1313

1414
use Symfony\Component\Messenger\Envelope;
15-
use Symfony\Component\Messenger\Exception\TransportException;
16-
use Symfony\Component\Messenger\Transport\Receiver\BlockingReceiverInterface;
1715
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
16+
use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface;
1817
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
1918
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2019
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -24,7 +23,7 @@
2423
/**
2524
* @author Nicolas Grekas <p@tchwork.com>
2625
*/
27-
class AmqpTransport implements QueueReceiverInterface, BlockingReceiverInterface, TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
26+
class AmqpTransport implements QueueReceiverInterface, QueueBlockingReceiverInterface, TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
2827
{
2928
private SerializerInterface $serializer;
3029
private Connection $connection;
@@ -53,6 +52,14 @@ public function pull(callable $callback): void
5352
$this->getReceiver()->pull($callback);
5453
}
5554

55+
/**
56+
* {@inheritdoc}
57+
*/
58+
public function pullFromQueues(array $queueNames, callable $callback): void
59+
{
60+
$this->receiver->pullFromQueues($queueNames, $callback);
61+
}
62+
5663
/**
5764
* {@inheritdoc}
5865
*/

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ protected function configure(): void
124124
125125
Use the F438 --blocking-mode option to force receiver to work in blocking mode
126126
("consume" method will be used instead of "get" in RabbitMQ for example).
127-
--queues option will be ignored.
128127
Only supported by some receivers, and you should pass only one receiver:
129128
130129
<info>php %command.full_name% <receiver-name> --blocking-mode</info>

src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
use Symfony\Component\Messenger\RoutableMessageBus;
2828
use Symfony\Component\Messenger\Stamp\BusNameStamp;
2929
use Symfony\Component\Messenger\Tests\ResettableDummyReceiver;
30-
use Symfony\Component\Messenger\Transport\Receiver\BlockingReceiverInterface;
30+
use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface;
3131
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
3232

3333
class ConsumeMessagesCommandTest extends TestCase
@@ -76,8 +76,8 @@ public function testRunWithBlockingModeOption()
7676
{
7777
$envelope = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
7878

79-
$receiver = $this->createMock(BlockingReceiverInterface::class);
80-
$receiver->expects($this->once())->method('pull')->willReturnCallback(function (callable $callback) use ($envelope) {
79+
$receiver = $this->createMock(QueueBlockingReceiverInterface::class);
80+
$receiver->expects($this->once())->method('pullFromQueues')->willReturnCallback(function (array $queueNames, callable $callback) use ($envelope) {
8181
call_user_func($callback, $envelope);
8282
});
8383

@@ -101,6 +101,7 @@ public function testRunWithBlockingModeOption()
101101
'receivers' => ['dummy-receiver'],
102102
'--limit' => 1,
103103
'--blocking-mode' => true,
104+
'--queues' => ['foo'],
104105
]);
105106

106107
$tester->assertCommandIsSuccessful();

src/Symfony/Component/Messenger/Tests/WorkerTest.php

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
use Symfony\Component\Messenger\Stamp\StampInterface;
4242
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
4343
use Symfony\Component\Messenger\Transport\Receiver\BlockingReceiverInterface;
44+
use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface;
4445
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
4546
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
4647
use Symfony\Component\Messenger\Worker;
@@ -612,6 +613,72 @@ public function testMoreThanOneReceiverInBlockingMode()
612613
$this->expectExceptionMessage('In blocking mode only one receiver is supported');
613614
$worker->run(['blocking-mode' => true]);
614615
}
616+
617+
public function testWorkerLimitQueuesInBlockingMode()
618+
{
619+
$apiMessage = new DummyMessage('API');
620+
$ipaMessage = new DummyMessage('IPA');
621+
622+
$receiver = new QueueBlockingDummyReceiver([
623+
[new Envelope($apiMessage), new Envelope($ipaMessage)],
624+
]);
625+
626+
$bus = $this->createMock(MessageBusInterface::class);
627+
$envelopes = [];
628+
629+
$bus->expects($this->exactly(2))
630+
->method('dispatch')
631+
->willReturnCallback(function ($envelope) use (&$envelopes) {
632+
return $envelopes[] = $envelope;
633+
});
634+
635+
$dispatcher = new class() implements EventDispatcherInterface {
636+
private StopWorkerOnMessageLimitListener $listener;
637+
638+
public function __construct()
639+
{
640+
$this->listener = new StopWorkerOnMessageLimitListener(2);
641+
}
642+
643+
public function dispatch(object $event): object
644+
{
645+
F42D if ($event instanceof WorkerRunningEvent) {
646+
$this->listener->onWorkerRunning($event);
647+
}
648+
649+
return $event;
650+
}
651+
};
652+
653+
$worker = new Worker(['transport' => $receiver], $bus, $dispatcher);
654+
$worker->run([
655+
'blocking-mode' => true,
656+
'queues' => ['foo']
657+
]);
658+
659+
$this->assertSame($apiMessage, $envelopes[0]->getMessage());
660+
$this->assertSame($ipaMessage, $envelopes[1]->getMessage());
661+
$this->assertCount(1, $envelopes[0]->all(ReceivedStamp::class));
662+
$this->assertCount(1, $envelopes[0]->all(ConsumedByWorkerStamp::class));
663+
$this->assertSame('transport', $envelopes[0]->last(ReceivedStamp::class)->getTransportName());
664+
665+
$this->assertSame(2, $receiver->getAcknowledgeCount());
666+
}
667+
668+
public function testWorkerLimitQueuesUnsupportedInBlockingMode()
669+
{
670+
$receiver = $this->createMock(BlockingReceiverInterface::class);
671+
672+
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
673+
674+
$worker = new Worker(['transport' => $receiver], $bus);
675+
$this->expectException(RuntimeException::class);
676+
$this->expectExceptionMessage(sprintf('Receiver for "transport" does not implement "%s".', QueueBlockingReceiverInterface::class));
677+
$worker->run([
678+
'queues' => ['foo'],
679+
'blocking-mode' => true,
680+
]);
681+
}
615682
}
616683

617684
class DummyReceiver implements ReceiverInterface
@@ -681,6 +748,14 @@ public function pull(callable $callback): void
681748
}
682749
}
683750

751+
class QueueBlockingDummyReceiver extends BlockingDummyReceiver implements QueueBlockingReceiverInterface
752+
{
753+
public function pullFromQueues(array $queueNames, callable $callback): void
754+
{
755+
$this->pull($callback);
756+
}
757+
}
758+
684759
class DummyQueueReceiver extends DummyReceiver implements QueueReceiverInterface
685760
{
686761
public function getFromQueues(array $queueNames): iterable
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport\Receiver;
13+
14+
/**
15+
* Some transports may have multiple queues. This interface is used to read from only some queues in blocking mode.
16+
*
17+
* @author Alexander Melikhov <amelihovv@ya.ru>
18+
*/
19+
interface QueueBlockingReceiverInterface extends BlockingReceiverInterface
20+
{
21+
/**
22+
* Pull messages from the specified queue names instead of consuming from all queues.
23+
*
24+
* @param string[] $queueNames
25+
* @param callable(\AMQPEnvelope):?false $callback If callback return false, then processing thread will be
26+
* returned to PHP script.
27+
*/
28+
public function pullFromQueues(array $queueNames, callable $callback): void;
29+
}

src/Symfony/Component/Messenger/Worker.php

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
use Symfony\Component\Messenger\Stamp\NoAutoAckStamp;
2929
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
3030
use Symfony\Component\Messenger\Transport\Receiver\BlockingReceiverInterface;
31+
use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface;
3132
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
3233
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
3334

@@ -97,8 +98,14 @@ public function run(array $options = []): void
9798
if ($queueNames) {
9899
// if queue names are specified, all receivers must implement the QueueReceiverInterface
99100
foreach ($this->receivers as $transportName => $receiver) {
100-
if (!$receiver instanceof QueueReceiverInterface) {
101-
throw new RuntimeException(sprintf('Receiver for "%s" does not implement "%s".', $transportName, QueueReceiverInterface::class));
101+
if ($blockingMode) {
102+
if (!$receiver instanceof QueueBlockingReceiverInterface) {
103+
throw new RuntimeException(sprintf('Receiver for "%s" does not implement "%s".', $transportName, QueueBlockingReceiverInterface::class));
104+
}
105+
} else {
106+
if (!$receiver instanceof QueueReceiverInterface) {
107+
throw new RuntimeException(sprintf('Receiver for "%s" does not implement "%s".', $transportName, QueueReceiverInterface::class));
108+
}
102109
}
103110
}
104111
}
@@ -108,9 +115,7 @@ public function run(array $options = []): void
108115
$envelopeHandledStart = microtime(true);
109116
foreach ($this->receivers as $transportName => $receiver) {
110117
if ($blockingMode) {
111-
/** @var BlockingReceiverInterface $receiver */
112-
113-
$receiver->pull(function (Envelope $envelope) use ($transportName, &$envelopeHandled) {
118+
$callback = function (Envelope $envelope) use ($transportName, &$envelopeHandled) {
114119
$envelopeHandled = true;
115120

116121
$this->handleMessage($envelope, $transportName);
@@ -119,7 +124,15 @@ public function run(array $options = []): void
119124
if ($this->shouldStop) {
120125
return false;
121126
}
122-
});
127+
};
128+
129+
if ($queueNames) {
130+
/** @var QueueBlockingReceiverInterface $receiver */
131+
$receiver->pullFromQueues($queueNames, $callback);
132+
} else {
133+
/** @var BlockingReceiverInterface $receiver */
134+
$receiver->pull($callback);
135+
}
123136
} else {
124137
if ($queueNames) {
125138
$envelopes = $receiver->getFromQueues($queueNames);

0 commit comments

Comments
 (0)
0