8000 blocking messenger consumer · symfony/symfony@56cb40f · GitHub
[go: up one dir, main page]

Skip to content

Commit 56cb40f

Browse files
committed
blocking messenger consumer
; Conflicts: ; src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php ; src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpTransport.php ; src/Symfony/Component/Messenger/Worker.php
1 parent 2ca1643 commit 56cb40f

File tree

11 files changed

+350
-16
lines changed

11 files changed

+350
-16
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,32 @@ public function testItReturnsTheDecodedMessageToTheHandler()
4646
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
4747
}
4848

49+
public function testItReturnsTheDecodedMessageToTheHandlerInBlockingMode()
50+
{
51+
$connection = $this->getMockBuilder(Connection::class)
52+
->disableOriginalConstructor()
53+
->onlyMethods(['getQueueNames', 'pull'])
54+
->getMock();
55+
$serializer = new Serializer(
56+
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
57+
);
58+
59+
$amqpEnvelope = $this->createAMQPEnvelope();
60+
61+
$amqpQueue = $this->createMock(\AMQPQueue::class);
62+
$amqpQueue->method('getName')->willReturn('queueName');
63+
64+
$connection->method('getQueueNames')->willReturn(['queueName']);
65+
$connection->method('pull')->willReturnCallback(function (string $queueName, callable $callback) use ($amqpQueue, $amqpEnvelope) {
66+
call_user_func($callback, $amqpEnvelope, $amqpQueue);
67+
});
68+
69+
$receiver = new AmqpReceiver($connection, $serializer);
70+
$receiver->pull(function (Envelope $envelope) {
71+
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
72+
});
73+
}
74+
4975
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
5076
{
5177
$this->expectException(TransportException::class);

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpTransportTest.php

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,36 @@ public function testReceivesMessages()
5252
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
5353
}
5454

55+
public function testReceivesMessagesInBlockingMode()
56+
{
57+
$transport = $this->getTransport(
58+
$serializer = $this->createMock(SerializerInterface::class),
59+
$connection = $this->getMockBuilder(Connection::class)
60+
->disableOriginalConstructor()
61+
->onlyMethods(['getQueueNames', 'pull'])
62+
->getMock(),
63+
);
64+
65+
$decodedMessage = new DummyMessage('Decoded.');
66+
67+
$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
68+
$amqpEnvelope->method('getBody')->willReturn('body');
69+
$amqpEnvelope->method('getHeaders')->willReturn(['my' => 'header']);
70+
71+
$amqpQueue = $this->createMock(\AMQPQueue::class);
72+
$amqpQueue->method('getName')->willReturn('queueName');
73+
74+
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
75+
$connection->method('getQueueNames')->willReturn(['queueName']);
76+
$connection->method('pull')->willReturnCallback(function (string $queueName, callable $callback) use ($amqpQueue, $amqpEnvelope) {
77+
call_user_func($callback, $amqpEnvelope, $amqpQueue);
78+
});
79+
80+
$transport->pull(function (Envelope $envelope) use ($decodedMessage) {
81+
$this->assertSame($decodedMessage, $envelope->getMessage());
82+
});
83+
}
84+
5585
private function getTransport(SerializerInterface $serializer = null, Connection $connection = null): AmqpTransport
5686
{
5787
$serializer ??= $this->createMock(SerializerInterface::class);

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Symfony\Component\Messenger\Exception\LogicException;
1616
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
1717
use Symfony\Component\Messenger\Exception\TransportException;
18+
use Symfony\Component\Messenger\Transport\Receiver\BlockingReceiverInterface;
1819
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1920
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
2021
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@@ -25,7 +26,7 @@
2526
*
2627
* @author Samuel Roze <samuel.roze@gmail.com>
2728
*/
28-
class AmqpReceiver implements QueueReceiverInterface, MessageCountAwareInterface
29+
class AmqpReceiver implements QueueReceiverInterface, BlockingReceiverInterface, MessageCountAwareInterface
2930
{
3031
private SerializerInterface $serializer;
3132
private Connection $connection;
@@ -41,6 +42,49 @@ public function get(): iterable
4142
yield from $this->getFromQueues($this->connection->getQueueNames());
4243
}
4344

45+
/**
46+
* {@inheritdoc}
47+
*/
48+
public function pull(callable $callback): void
49+
{
50+
if (0 === count($this->connection->getQueueNames())) {
51+
return;
52+
}
53+
54+
$queueNames = $this->connection->getQueueNames();
55+
56+
// Pop last queue to send callback
57+
$firstQueue = array_pop($queueNames);
58+
59+
foreach ($queueNames as $queueName) {
60+
$this->pullEnvelope($queueName, null);
61+
}
62+
63+
$this->pullEnvelope($firstQueue, $callback);
64+
}
65+
66+
private function pullEnvelope(string $queueName, ?callable $callback): void
67+
{
68+
if ($callback !== null) {
69+
$callback = function (\AMQPEnvelope $amqpEnvelope, \AMQPQueue $queue) use ($callback) {
70+
$queueName = $queue->getName();
71+
$body = $amqpEnvelope->getBody();
72+
$envelope = $this->decodeAmqpEnvelope($amqpEnvelope, $body, $queueName);
73+
74+
return $callback($envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName)));
75+
};
76+
}
77+
78+
try {
79+
$this->connection->pull($queueName, $callback);
80+
} catch (\AMQPException $exception) {
81+
throw new TransportException($exception->getMessage(), 0, $exception);
82+
}
83+
}
84+
85+
/**
86+
* {@inheritdoc}
87+
*/
4488
public function getFromQueues(array $queueNames): iterable
4589
{
4690
foreach ($queueNames as $queueName) {
@@ -61,9 +105,15 @@ private function getEnvelope(string $queueName): iterable
61105
}
62106

63107
$body = $amqpEnvelope->getBody();
108+
$envelope = $this->decodeAmqpEnvelope($amqpEnvelope, $body, $queueName);
64109

110+
yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName));
111+
}
112+
113+
private function decodeAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, false|string $body, string $queueName): Envelope
114+
{
65115
try {
66-
$envelope = $this->serializer->decode([
116+
return $this->serializer->decode([
67117
'body' => false === $body ? '' : $body, // workaround https://github.com/pdezwart/php-amqp/issues/351
68118
'headers' => $amqpEnvelope->getHeaders(),
69119
]);
@@ -73,8 +123,6 @@ private function getEnvelope(string $queueName): iterable
73123

74124
throw $exception;
75125
}
76-
77-
yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName));
78126
}
79127

80128
public function ack(Envelope $envelope): void

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpTransport.php

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
1313

1414
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Exception\TransportException;
16+
use Symfony\Component\Messenger\Transport\Receiver\BlockingReceiverInterface;
1517
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1618
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
1719
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@@ -22,7 +24,7 @@
2224
/**
2325
* @author Nicolas Grekas <p@tchwork.com>
2426
*/
25-
class AmqpTransport implements QueueReceiverInterface, TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
27+
class AmqpTransport implements QueueReceiverInterface, BlockingReceiverInterface, TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
2628
{
2729
private SerializerInterface $serializer;
2830
private Connection $connection;
@@ -40,6 +42,17 @@ public function get(): iterable
4042
return $this->getReceiver()->get();
4143
}
4244

45+
/**
46+
* {@inheritdoc}
47+
*/
48+
public function pull(callable $callback): void
49+
{
50+
$this->getReceiver()->pull($callback);
51+
}
52+
53+
/**
54+
* {@inheritdoc}
55+
*/
4356
public function getFromQueues(array $queueNames): iterable
4457
{
4558
return $this->getReceiver()->getFromQueues($queueNames);

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,27 @@ public function get(string $queueName): ?\AMQPEnvelope
433433
return null;
434434
}
435435

436+
/**
437+
* Consume a message from the specified queue in blocking mode.
438+
*
439+
* @param ?callable(\AMQPEnvelope,\AMQPQueue):?false $callback If callback return false, then processing thread will be
440+
* returned from AMQPQueue::consume() to PHP script. If null is passed, then the messages delivered to this client
441+
* will be made available to the first real callback registered. That allows one to have a single callback
442+
* consuming from multiple queues.
443+
*
444+
* @throws \AMQPException
445+
*/
446+
public function pull(string $queueName, ?callable $callback): void
447+
{
448+
$this->clearWhenDisconnected();
449+
450+
if ($this->autoSetupExchange) {
451+
$this->setupExchangeAndQueues();
452+
}
453+
454+
$this->queue($queueName)->consume($callback);
455+
}
456+
436457
public function ack(\AMQPEnvelope $message, string $queueName): bool
437458
{
438459
return $this->queue($queueName)->ack($message->getDeliveryTag());

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ CHANGELOG
44
6.3
55
---
66

7+
* Add `--blocking-mode` option to `messenger:consume` (will use more efficient `consume` method instead of `get` method in amqp transport)
78
* Add support for namespace wildcards in the HandlersLocator to allow routing multiple messages within the same namespace
89
* Deprecate `Symfony\Component\Messenger\Transport\InMemoryTransport` and
910
`Symfony\Component\Messenger\Transport\InMemoryTransportFactory` in favor of

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
3434
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
3535
use Symfony\Component\Messenger\RoutableMessageBus;
36+
use Symfony\Component\Messenger\Transport\Receiver\BlockingReceiverInterface;
3637
use Symfony\Component\Messenger\Worker;
3738

3839
/**
@@ -79,6 +80,7 @@ protected function configure(): void
7980
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically)'),
8081
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
8182
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
83+
new InputOption('blocking-mode', null, InputOption::VALUE_NONE, 'Consume messages in blocking mode. If option is specified only one receiver is supported'),
8284
])
8385
->setHelp(<<<'EOF'
8486
The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
@@ -119,6 +121,13 @@ protected function configure(): void
119121
Use the --no-reset option to prevent services resetting after each message (may lead to leaking services' state between messages):
120122
121123
<info>php %command.full_name% <receiver-name> --no-reset</info>
124+
125+
Use the --blocking-mode option to force receiver to work in blocking mode
126+
("consume" method will be used instead of "get" in RabbitMQ for example).
127+
--queues option will be ignored.
128+
Only supported by some receivers, and you should pass only one receiver:
129+
130+
<info>php %command.full_name% <receiver-name> --blocking-mode</info>
122131
EOF
123132
)
124133
;
@@ -225,6 +234,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
225234
$worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger, $rateLimiters);
226235
$options = [
227236
'sleep' => $input->getOption('sleep') * 1000000,
237+
'blocking-mode' => (bool) $input->getOption('blocking-mode'),
228238
];
229239
if ($queues = $input->getOption('queues')) {
230240
$options['queues'] = $queues;

src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
use Symfony\Component\Messenger\RoutableMessageBus;
2929
use Symfony\Component\Messenger\Stamp\BusNameStamp;
3030
use Symfony\Component\Messenger\Tests\ResettableDummyReceiver;
31+
use Symfony\Component\Messenger\Transport\Receiver\BlockingReceiverInterface;
3132
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
3233

3334
class ConsumeMessagesCommandTest extends TestCase
@@ -72,6 +73,41 @@ public function testBasicRun()
7273
$this->assertStringContainsString('[OK] Consuming messages from transport "dummy-receiver"', $tester->getDisplay());
7374
}
7475

76+
public function testRunWithBlockingModeOption()
77+
{
78+
$envelope = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
79+
80+
$receiver = $this->createMock(BlockingReceiverInterface::class);
81+
$receiver->expects($this->once())->method('pull')->willReturnCallback(function (callable $callback) use ($envelope) {
82+
call_user_func($callback, $envelope);
83+
});
84+
85+
$receiverLocator = $this->createMock(ContainerInterface::class);
86+
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true);
87+
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver);
88+
89+
$bus = $this->createMock(MessageBusInterface::class);
90+
$bus->expects($this->once())->method('dispatch');
91+
92+
$busLocator = $this->createMock(ContainerInterface::class);
93+
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
94+
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
95+
96+
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator, new EventDispatcher());
97+
98+
$application = new Application();
99+
$application->add($command);
100+
$tester = new CommandTester($application->get('messenger:consume'));
101+
$tester->execute([
102+
'receivers' => ['dummy-receiver'],
103+
'--limit' => 1,
104+
'--blocking-mode' => true,
105+
]);
106+
107+
$tester->assertCommandIsSuccessful();
108+
$this->assertStringContainsString('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
109+
}
110+
75111
public function testRunWithBusOption()
76112
{
77113
$envelope = new Envelope(new \stdClass());

0 commit comments

Comments
 (0)
0