10000 [Messenger] Blocking consumer by melihovv · Pull Request #47226 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[Messenger] Blocking consumer #47226

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
blocking messenger consumer
; Conflicts:
;	src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php
;	src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpTransport.php
;	src/Symfony/Component/Messenger/Worker.php
  • Loading branch information
melihovv committed Jul 4, 2023
commit 56cb40f1f49e0fcb0c48b7eda2272da858488059
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,32 @@ public function testItReturnsTheDecodedMessageToTheHandler()
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
}

public function testItReturnsTheDecodedMessageToTheHandlerInBlockingMode()
{
$connection = $this->getMockBuilder(Connection::class)
->disableOriginalConstructor()
->onlyMethods(['getQueueNames', 'pull'])
->getMock();
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);

$amqpEnvelope = $this->createAMQPEnvelope();

$amqpQueue = $this->createMock(\AMQPQueue::class);
$amqpQueue->method('getName')->willReturn('queueName');

$connection->method('getQueueNames')->willReturn(['queueName']);
$connection->method('pull')->willReturnCallback(function (string $queueName, callable $callback) use ($amqpQueue, $amqpEnvelope) {
call_user_func($callback, $amqpEnvelope, $amqpQueue);
});

$receiver = new AmqpReceiver($connection, $serializer);
$receiver->pull(function (Envelope $envelope) {
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
});
}

public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
{
$this->expectException(TransportException::class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,36 @@ public function testReceivesMessages()
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
}

public function testReceivesMessagesInBlockingMode()
{
$transport = $this->getTransport(
$serializer = $this->createMock(SerializerInterface::class),
$connection = $this->getMockBuilder(Connection::class)
->disableOriginalConstructor()
->onlyMethods(['getQueueNames', 'pull'])
->getMock(),
);

$decodedMessage = new DummyMessage('Decoded.');

$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
$amqpEnvelope->method('getBody')->willReturn('body');
$amqpEnvelope->method('getHeaders')->willReturn(['my' => 'header']);

$amqpQueue = $this->createMock(\AMQPQueue::class);
$amqpQueue->method('getName')->willReturn('queueName');

$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
$connection->method('getQueueNames')->willReturn(['queueName']);
$connection->method('pull')->willReturnCallback(function (string $queueName, callable $callback) use ($amqpQueue, $amqpEnvelope) {
call_user_func($callback, $amqpEnvelope, $amqpQueue);
});

$transport->pull(function (Envelope $envelope) use ($decodedMessage) {
$this->assertSame($decodedMessage, $envelope->getMessage());
});
}

private function getTransport(SerializerInterface $serializer = null, Connection $connection = null): AmqpTransport
{
$serializer ??= $this->createMock(SerializerInterface::class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Receiver\BlockingReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
Expand All @@ -25,7 +26,7 @@
*
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class AmqpReceiver implements QueueReceiverInterface, MessageCountAwareInterface
class AmqpReceiver implements QueueReceiverInterface, BlockingReceiverInterface, MessageCountAwareInterface
{
private SerializerInterface $serializer;
private Connection $connection;
Expand All @@ -41,6 +42,49 @@ public function get(): iterable
yield from $this->getFromQueues($this->connection->getQueueNames());
}

/**
* {@inheritdoc}
*/
public function pull(callable $callback): void
{
if (0 === count($this->connection->getQueueNames())) {
return;
}

$queueNames = $this->connection->getQueueNames();

// Pop last queue to send callback
$firstQueue = array_pop($queueNames);

foreach ($queueNames as $queueName) {
$this->pullEnvelope($queueName, null);
}

$this->pullEnvelope($firstQueue, $callback);
}

private function pullEnvelope(string $queueName, ?callable $callback): void
{
if ($callback !== null) {
$callback = function (\AMQPEnvelope $amqpEnvelope, \AMQPQueue $queue) use ($callback) {
$queueName = $queue->getName();
$body = $amqpEnvelope->getBody();
$envelope = $this->decodeAmqpEnvelope($amqpEnvelope, $body, $queueName);

return $callback($envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName)));
};
}

try {
$this->connection->pull($queueName, $callback);
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
}

/**
* {@inheritdoc}
*/
public function getFromQueues(array $queueNames): iterable
{
foreach ($queueNames as $queueName) {
Expand All @@ -61,9 +105,15 @@ private function getEnvelope(string $queueName): iterable
}

$body = $amqpEnvelope->getBody();
$envelope = $this->decodeAmqpEnvelope($amqpEnvelope, $body, $queueName);

yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName));
}

private function decodeAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, false|string $body, string $queueName): Envelope
{
try {
$envelope = $this->serializer->decode([
return $this->serializer->decode([
'body' => false === $body ? '' : $body, // workaround https://github.com/pdezwart/php-amqp/issues/351
'headers' => $amqpEnvelope->getHeaders(),
]);
Expand All @@ -73,8 +123,6 @@ private function getEnvelope(string $queueName): iterable

throw $exception;
}

yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName));
}

public function ack(Envelope $envelope): void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Receiver\BlockingReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
Expand All @@ -22,7 +24,7 @@
/**
* @author Nicolas Grekas <p@tchwork.com>
*/
class AmqpTransport implements QueueReceiverInterface, TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
class AmqpTransport implements QueueReceiverInterface, BlockingReceiverInterface, TransportInterface, SetupableTransportInterface, MessageCountAwareI 1E0A nterface
{
private SerializerInterface $serializer;
private Connection $connection;
Expand All @@ -40,6 +42,17 @@ public function get(): iterable
return $this->getReceiver()->get();
}

/**
* {@inheritdoc}
*/
public function pull(callable $callback): void
{
$this->getReceiver()->pull($callback);
}

/**
* {@inheritdoc}
*/
public function getFromQueues(array $queueNames): iterable
{
return $this->getReceiver()->getFromQueues($queueNames);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,27 @@ public function get(string $queueName): ?\AMQPEnvelope
return null;
}

/**
* Consume a message from the specified queue in blocking mode.
*
* @param ?callable(\AMQPEnvelope,\AMQPQueue):?false $callback If callback return false, then processing thread will be
* returned from AMQPQueue::consume() to PHP script. If null is passed, then the messages delivered to this client
* will be made available to the first real callback registered. That allows one to have a single callback
* consuming from multiple queues.
*
* @throws \AMQPException
*/
public function pull(string $queueName, ?callable $callback): void
{
$this->clearWhenDisconnected();

if ($this->autoSetupExchange) {
$this->setupExchangeAndQueues();
}

$this->queue($queueName)->consume($callback);
}

public function ack(\AMQPEnvelope $message, string $queueName): bool
{
return $this->queue($queueName)->ack($message->getDeliveryTag());
Expand Down
1 change: 1 addition & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ CHANGELOG
6.3
---

* Add `--blocking-mode` option to `messenger:consume` (will use more efficient `consume` method instead of `get` method in amqp transport)
* Add support for namespace wildcards in the HandlersLocator to allow routing multiple messages within the same namespace
* Deprecate `Symfony\Component\Messenger\Transport\InMemoryTransport` and
`Symfony\Component\Messenger\Transport\InMemoryTransportFactory` in favor of
Expand Down
10 changes: 10 additions & 0 deletions src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Transport\Receiver\BlockingReceiverInterface;
use Symfony\Component\Messenger\Worker;

/**
Expand Down Expand Up @@ -79,6 +80,7 @@ protected function configure(): void
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically)'),
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
new InputOption('blocking-mode', null, InputOption::VALUE_NONE, 'Consume messages in blocking mode. If option is specified only one receiver is supported'),
])
->setHelp(<<<'EOF'
The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
Expand Down Expand Up @@ -119,6 +121,13 @@ protected function configure(): void
Use the --no-reset option to prevent services resetting after each message (may lead to leaking services' state between messages):

<info>php %command.full_name% <receiver-name> --no-reset</info>

Use the --blocking-mode option to force receiver to work in blocking mode
("consume" method will be used instead of "get" in RabbitMQ for example).
--queues option will be ignored.
Only supported by some receivers, and you should pass only one receiver:

<info>php %command.full_name% <receiver-name> --blocking-mode</info>
EOF
)
;
Expand Down Expand Up @@ -225,6 +234,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger, $rateLimiters);
$options = [
'sleep' => $input->getOption('sleep') * 1000000,
'blocking-mode' => (bool) $input->getOption('blocking-mode'),
];
if ($queues = $input->getOption('queues')) {
$options['queues'] = $queues;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Stamp\BusNameStamp;
use Symfony\Component\Messenger\Tests\ResettableDummyReceiver;
use Symfony\Component\Messenger\Transport\Receiver\BlockingReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;

class ConsumeMessagesCommandTest extends TestCase
Expand Down Expand Up @@ -72,6 +73,41 @@ public function testBasicRun()
$this->assertStringContainsString('[OK] Consuming messages from transport "dummy-receiver"', $tester->getDisplay());
}

public function testRunWithBlockingModeOption()
{
$envelope = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);

$receiver = $this->createMock(BlockingReceiverInterface::class);
$receiver->expects($this->once())->method('pull')->willReturnCallback(function (callable $callback) use ($envelope) {
call_user_func($callback, $envelope);
});

$receiverLocator = $this->createMock(ContainerInterface::class);
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true);
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver);

$bus = $this->createMock(MessageBusInterface::class);
$bus->expects($this->once())->method('dispatch');

$busLocator = $this->createMock(ContainerInterface::class);
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);

$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator, new EventDispatcher());

$application = new Application();
$application->add($command);
$tester = new CommandTester($application->get('messenger:consume'));
$tester->execute([
'receivers' => ['dummy-receiver'],
'--limit' => 1,
'--blocking-mode' => true,
]);

$tester->assertCommandIsSuccessful();
$this->assertStringContainsString('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
}

public function testRunWithBusOption()
{
$envelope = new Envelope(new \stdClass());
Expand Down
Loading
0