diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md
index 9638de4e49092..54b22a321f086 100644
--- a/src/Symfony/Component/Messenger/CHANGELOG.md
+++ b/src/Symfony/Component/Messenger/CHANGELOG.md
@@ -11,8 +11,9 @@ CHANGELOG
to the `Envelope` then find the correct bus when receiving from
the transport. See `ConsumeMessagesCommand`.
* The optional `$busNames` constructor argument of the class `ConsumeMessagesCommand` was removed.
- * [BC BREAK] 2 new methods were added to `ReceiverInterface`:
- `ack()` and `reject()`.
+ * [BC BREAK] 3 new methods were added to `ReceiverInterface`:
+ `ack()`, `reject()` and `get()`. The methods `receive()`
+ and `stop()` were removed.
* [BC BREAK] Error handling was moved from the receivers into
`Worker`. Implementations of `ReceiverInterface::handle()`
should now allow all exceptions to be thrown, except for transport
@@ -24,7 +25,9 @@ CHANGELOG
* The default command name for `ConsumeMessagesCommand` was
changed from `messenger:consume-messages` to `messenger:consume`
* `ConsumeMessagesCommand` has two new optional constructor arguments
- * `Worker` has 4 new option constructor arguments.
+ * [BC BREAK] The first argument to Worker changed from a single
+ `ReceiverInterface` to an array of `ReceiverInterface`.
+ * `Worker` has 3 new optional constructor arguments.
* The `Worker` class now handles calling `pcntl_signal_dispatch()` the
receiver no longer needs to call this.
* The `AmqpSender` will now retry messages using a dead-letter exchange
diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
index e49b28705c876..bd3a7a177c6c3 100644
--- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
@@ -19,12 +19,13 @@
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
+use Symfony\Component\Console\Question\ChoiceQuestion;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\RoutableMessageBus;
-use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
-use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
-use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
use Symfony\Component\Messenger\Worker;
+use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
+use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
+use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
/**
@@ -70,10 +71,11 @@ protected function configure(): void
$this
->setDefinition([
- new InputArgument('receiver', $defaultReceiverName ? InputArgument::OPTIONAL : InputArgument::REQUIRED, 'Name of the receiver', $defaultReceiverName),
+ new InputArgument('receivers', InputArgument::IS_ARRAY, 'Names of the receivers/transports to consume in order of priority', $defaultReceiverName ? [$defaultReceiverName] : []),
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
+ new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically.'),
])
->setDescription('Consumes messages')
@@ -82,6 +84,10 @@ protected function configure(): void
php %command.full_name%
+To receive from multiple transports, pass each name:
+
+ php %command.full_name% receiver1 receiver2
+
Use the --limit option to limit the number of messages received:
php %command.full_name% --limit=10
@@ -111,16 +117,22 @@ protected function interact(InputInterface $input, OutputInterface $output)
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
- if ($this->receiverNames && !$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
- if (null === $receiverName) {
- $io->block('Missing receiver argument.', null, 'error', ' ', true);
- $input->setArgument('receiver', $io->choice('Select one of the available receivers', $this->receiverNames));
- } elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) {
- $io->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true);
- if ($io->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) {
- $input->setArgument('receiver', $alternatives[0]);
- }
+ if ($this->receiverNames && 0 === \count($input->getArgument('receivers'))) {
+ $io->block('Which transports/receivers do you want to consume?', null, 'fg=white;bg=blue', ' ', true);
+
+ $io->writeln('Choose which receivers you want to consume messages from in order of priority.');
+ if (\count($this->receiverNames) > 1) {
+ $io->writeln(sprintf('Hint: to consume from multiple, use a list of their names, e.g. %s', implode(', ', $this->receiverNames)));
}
+
+ $question = new ChoiceQuestion('Select receivers to consume:', $this->receiverNames, 0);
+ $question->setMultiselect(true);
+
+ $input->setArgument('receivers', $io->askQuestion($question));
+ }
+
+ if (0 === \count($input->getArgument('receivers'))) {
+ throw new RuntimeException('Please pass at least one receiver.');
}
}
@@ -135,16 +147,25 @@ protected function execute(InputInterface $input, OutputInterface $output): void
$output->writeln(sprintf('%s', $message));
}
- if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
- throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
- }
+ $receivers = [];
+ $retryStrategies = [];
+ foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) {
+ if (!$this->receiverLocator->has($receiverName)) {
+ $message = sprintf('The receiver "%s" does not exist.', $receiverName);
+ if ($this->receiverNames) {
+ $message .= sprintf(' Valid receivers are: %s.', implode(', ', $this->receiverNames));
+ }
- if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
- throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
- }
+ throw new RuntimeException($message);
+ }
- $receiver = $this->receiverLocator->get($receiverName);
- $retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
+ if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
+ throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
+ }
+
+ $receivers[$receiverName] = $this->receiverLocator->get($receiverName);
+ $retryStrategies[$receiverName] = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
+ }
if (null !== $input->getOption('bus')) {
$bus = $this->busLocator->get($input->getOption('bus'));
@@ -152,24 +173,25 @@ protected function execute(InputInterface $input, OutputInterface $output): void
$bus = new RoutableMessageBus($this->busLocator);
}
+ $worker = new Worker($receivers, $bus, $retryStrategies, $this->eventDispatcher, $this->logger);
$stopsWhen = [];
if ($limit = $input->getOption('limit')) {
$stopsWhen[] = "processed {$limit} messages";
- $receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger);
+ $worker = new StopWhenMessageCountIsExceededWorker($worker, $limit, $this->logger);
}
if ($memoryLimit = $input->getOption('memory-limit')) {
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
- $receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger);
+ $worker = new StopWhenMemoryUsageIsExceededWorker($worker, $this->convertToBytes($memoryLimit), $this->logger);
}
if ($timeLimit = $input->getOption('time-limit')) {
$stopsWhen[] = "been running for {$timeLimit}s";
- $receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger);
+ $worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger);
}
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
- $io->success(sprintf('Consuming messages from transport "%s".', $receiverName));
+ $io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames)));
if ($stopsWhen) {
$last = array_pop($stopsWhen);
@@ -183,8 +205,9 @@ protected function execute(InputInterface $input, OutputInterface $output): void
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
}
- $worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger);
- $worker->run();
+ $worker->run([
+ 'sleep' => $input->getOption('sleep') * 1000000,
+ ]);
}
private function convertToBytes(string $memoryLimit): int
diff --git a/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php
index 336cd8129cf4c..e7ce90b85c0bf 100644
--- a/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php
@@ -20,16 +20,8 @@ class ConsumeMessagesCommandTest extends TestCase
public function testConfigurationWithDefaultReceiver()
{
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp']);
- $inputArgument = $command->getDefinition()->getArgument('receiver');
+ $inputArgument = $command->getDefinition()->getArgument('receivers');
$this->assertFalse($inputArgument->isRequired());
- $this->assertSame('amqp', $inputArgument->getDefault());
- }
-
- public function testConfigurationWithoutDefaultReceiver()
- {
- $command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp', 'dummy']);
- $inputArgument = $command->getDefinition()->getArgument('receiver');
- $this->assertTrue($inputArgument->isRequired());
- $this->assertNull($inputArgument->getDefault());
+ $this->assertSame(['amqp'], $inputArgument->getDefault());
}
}
diff --git a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php
index 4e32ba9b49aab..51e7cbc88c1f3 100644
--- a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php
+++ b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php
@@ -612,11 +612,9 @@ public function __invoke(DummyMessage $message): void
class DummyReceiver implements ReceiverInterface
{
- public function receive(callable $handler): void
+ public function get(): iterable
{
- for ($i = 0; $i < 3; ++$i) {
- $handler(new Envelope(new DummyMessage("Dummy $i")));
- }
+ yield new Envelope(new DummyMessage('Dummy'));
}
public function stop(): void
diff --git a/src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php b/src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php
deleted file mode 100644
index b1d26934d252c..0000000000000
--- a/src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php
+++ /dev/null
@@ -1,48 +0,0 @@
-callable = $callable;
- }
-
- public function receive(callable $handler): void
- {
- $callable = $this->callable;
- $callable($handler);
- }
-
- public function stop(): void
- {
- }
-
- public function ack(Envelope $envelope): void
- {
- ++$this->acknowledgeCount;
- }
-
- public function reject(Envelope $envelope): void
- {
- ++$this->rejectCount;
- }
-
- public function getAcknowledgeCount(): int
- {
- return $this->acknowledgeCount;
- }
-
- public function getRejectCount(): int
- {
- return $this->rejectCount;
- }
-}
diff --git a/src/Symfony/Component/Messenger/Tests/Fixtures/DummyWorker.php b/src/Symfony/Component/Messenger/Tests/Fixtures/DummyWorker.php
new file mode 100644
index 0000000000000..2c66bdedbeba6
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/Fixtures/DummyWorker.php
@@ -0,0 +1,46 @@
+envelopesToReceive = $envelopesToReceive;
+ }
+
+ public function run(array $options = [], callable $onHandledCallback = null): void
+ {
+ foreach ($this->envelopesToReceive as $envelope) {
+ if (true === $this->isStopped) {
+ break;
+ }
+
+ if ($onHandledCallback) {
+ $onHandledCallback($envelope);
+ ++$this->envelopesHandled;
+ }
+ }
+ }
+
+ public function stop(): void
+ {
+ $this->isStopped = true;
+ }
+
+ public function isStopped(): bool
+ {
+ return $this->isStopped;
+ }
+
+ public function countEnvelopesHandled()
+ {
+ return $this->envelopesHandled;
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php
index 2949ae837fe36..b2ebfe4ab5043 100644
--- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php
@@ -57,16 +57,20 @@ public function testItSendsAndReceivesMessages()
$sender->send($first = new Envelope(new DummyMessage('First')));
$sender->send($second = new Envelope(new DummyMessage('Second')));
- $receivedMessages = 0;
- $receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
- $expectedEnvelope = 0 === $receivedMessages ? $first : $second;
- $this->assertEquals($expectedEnvelope->getMessage(), $envelope->getMessage());
- $this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));
-
- if (2 === ++$receivedMessages) {
- $receiver->stop();
- }
- });
+ $envelopes = iterator_to_array($receiver->get());
+ $this->assertCount(1, $envelopes);
+ /** @var Envelope $envelope */
+ $envelope = $envelopes[0];
+ $this->assertEquals($first->getMessage(), $envelope->getMessage());
+ $this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));
+
+ $envelopes = iterator_to_array($receiver->get());
+ $this->assertCount(1, $envelopes);
+ /** @var Envelope $envelope */
+ $envelope = $envelopes[0];
+ $this->assertEquals($second->getMessage(), $envelope->getMessage());
+
+ $this->assertEmpty(iterator_to_array($receiver->get()));
}
public function testRetryAndDelay()
@@ -82,50 +86,38 @@ public function testRetryAndDelay()
$sender->send($first = new Envelope(new DummyMessage('First')));
- $receivedMessages = 0;
- $startTime = time();
- $receiver->receive(function (?Envelope $envelope) use ($receiver, $sender, &$receivedMessages, $startTime) {
- if (null === $envelope) {
- // if we have been processing for 4 seconds + have received 2 messages
- // then it's safe to say no other messages will be received
- if (time() > $startTime + 4 && 2 === $receivedMessages) {
- $receiver->stop();
- }
-
- return;
- }
-
- ++$receivedMessages;
-
- // retry the first time
- if (1 === $receivedMessages) {
- // imitate what Worker does
- $envelope = $envelope
- ->with(new DelayStamp(2000))
- ->with(new RedeliveryStamp(1, 'not_important'));
- $sender->send($envelope);
- $receiver->ack($envelope);
+ $envelopes = iterator_to_array($receiver->get());
+ /** @var Envelope $envelope */
+ $envelope = $envelopes[0];
+ $newEnvelope = $envelope
+ ->with(new DelayStamp(2000))
+ ->with(new RedeliveryStamp(1, 'not_important'));
+ $sender->send($newEnvelope);
+ $receiver->ack($envelope);
- return;
- }
+ $envelopes = [];
+ $startTime = time();
+ // wait for next message, but only for max 3 seconds
+ while (0 === \count($envelopes) && $startTime + 3 > time()) {
+ $envelopes = iterator_to_array($receiver->get());
+ }
- if (2 === $receivedMessages) {
- // should have a 2 second delay
- $this->assertGreaterThanOrEqual($startTime + 2, time());
- // but only a 2 second delay
- $this->assertLessThan($startTime + 4, time());
+ $this->assertCount(1, $envelopes);
+ /** @var Envelope $envelope */
+ $envelope = $envelopes[0];
- /** @var RedeliveryStamp|null $retryStamp */
- // verify the stamp still exists from the last send
- $retryStamp = $envelope->last(RedeliveryStamp::class);
- $this->assertNotNull($retryStamp);
- $this->assertSame(1, $retryStamp->getRetryCount());
+ // should have a 2 second delay
+ $this->assertGreaterThanOrEqual($startTime + 2, time());
+ // but only a 2 second delay
+ $this->assertLessThan($startTime + 4, time());
- $receiver->ack($envelope);
+ /** @var RedeliveryStamp|null $retryStamp */
+ // verify the stamp still exists from the last send
+ $retryStamp = $envelope->last(RedeliveryStamp::class);
+ $this->assertNotNull($retryStamp);
+ $this->assertSame(1, $retryStamp->getRetryCount());
- return;
- }
- });
+ $receiver->ack($envelope);
}
public function testItReceivesSignals()
@@ -175,29 +167,6 @@ public function testItReceivesSignals()
, $process->getOutput());
}
- /**
- * @runInSeparateProcess
- */
- public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
- {
- $serializer = $this->createSerializer();
-
- $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), ['read_timeout' => '1']);
- $connection->setup();
- $connection->queue()->purge();
-
- $receiver = new AmqpReceiver($connection, $serializer);
-
- $receivedMessages = 0;
- $receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
- $this->assertNull($envelope);
-
- if (2 === ++$receivedMessages) {
- $receiver->stop();
- }
- });
- }
-
private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
{
$timedOutTime = time() + $timeoutInSeconds;
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php
index d0c8abfa3564e..d27ddb9cd26a9 100644
--- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php
@@ -14,9 +14,11 @@
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
+use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
+use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
@@ -26,7 +28,7 @@
*/
class AmqpReceiverTest extends TestCase
{
- public function testItSendTheDecodedMessageToTheHandler()
+ public function testItReturnsTheDecodedMessageToTheHandler()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
@@ -37,10 +39,9 @@ public function testItSendTheDecodedMessageToTheHandler()
$connection->method('get')->willReturn($amqpEnvelope);
$receiver = new AmqpReceiver($connection, $serializer);
- $receiver->receive(function (?Envelope $envelope) use ($receiver) {
- $this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
- $receiver->stop();
- });
+ $actualEnvelopes = iterator_to_array($receiver->get());
+ $this->assertCount(1, $actualEnvelopes);
+ $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
}
/**
@@ -48,20 +49,14 @@ public function testItSendTheDecodedMessageToTheHandler()
*/
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
{
- $serializer = new Serializer(
- new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
- );
-
+ $serializer = $this->createMock(SerializerInterface::class);
$amqpEnvelope = $this->createAMQPEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($amqpEnvelope);
$connection->method('ack')->with($amqpEnvelope)->willThrowException(new \AMQPException());
$receiver = new AmqpReceiver($connection, $serializer);
- $receiver->receive(function (?Envelope $envelope) use ($receiver) {
- $receiver->ack($envelope);
- $receiver->stop();
- });
+ $receiver->ack(new Envelope(new \stdClass(), new AmqpReceivedStamp($amqpEnvelope)));
}
/**
@@ -69,20 +64,14 @@ public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
*/
public function testItThrowsATransportExceptionIfItCannotRejectMessage()
{
- $serializer = new Serializer(
- new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
- );
-
+ $serializer = $this->createMock(SerializerInterface::class);
$amqpEnvelope = $this->createAMQPEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($amqpEnvelope);
$connection->method('nack')->with($amqpEnvelope, AMQP_NOPARAM)->willThrowException(new \AMQPException());
$receiver = new AmqpReceiver($connection, $serializer);
- $receiver->receive(function (?Envelope $envelope) use ($receiver) {
- $receiver->reject($envelope);
- $receiver->stop();
- });
+ $receiver->reject(new Envelope(new \stdClass(), new AmqpReceivedStamp($amqpEnvelope)));
}
private function createAMQPEnvelope()
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportTest.php
index c343c29226369..b3e8b5729dd70 100644
--- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportTest.php
@@ -47,11 +47,8 @@ public function testReceivesMessages()
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
$connection->method('get')->willReturn($amqpEnvelope);
- $transport->receive(function (Envelope $envelope) use ($transport, $decodedMessage) {
- $this->assertSame($decodedMessage, $envelope->getMessage());
-
- $transport->stop();
- });
+ $envelopes = iterator_to_array($transport->get());
+ $this->assertSame($decodedMessage, $envelopes[0]->getMessage());
}
private function getTransport(SerializerInterface $serializer = null, Connection $connection = null)
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Fixtures/long_receiver.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Fixtures/long_receiver.php
index ba7236103579b..f51412093451c 100644
--- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Fixtures/long_receiver.php
+++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Fixtures/long_receiver.php
@@ -32,7 +32,7 @@
$receiver = new AmqpReceiver($connection, $serializer);
$retryStrategy = new MultiplierRetryStrategy(3, 0);
-$worker = new Worker($receiver, new class() implements MessageBusInterface {
+$worker = new Worker(['the_receiver' => $receiver], new class() implements MessageBusInterface {
public function dispatch($envelope): Envelope
{
echo 'Get envelope with message: '.\get_class($envelope->getMessage())."\n";
@@ -43,7 +43,7 @@ public function dispatch($envelope): Envelope
return $envelope;
}
-}, 'the_receiver', $retryStrategy);
+});
echo "Receiving messages...\n";
$worker->run();
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiverTest.php
deleted file mode 100644
index 27314e75502e7..0000000000000
--- a/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiverTest.php
+++ /dev/null
@@ -1,84 +0,0 @@
-
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Symfony\Component\Messenger\Tests\Transport\Receiver;
-
-use PHPUnit\Framework\TestCase;
-use Psr\Log\LoggerInterface;
-use Symfony\Component\Messenger\Envelope;
-use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
-use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
-use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
-
-class StopWhenMemoryUsageIsExceededReceiverTest extends TestCase
-{
- /**
- * @dataProvider memoryProvider
- */
- public function testReceiverStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop)
- {
- $callable = function ($handler) {
- $handler(new Envelope(new DummyMessage('API')));
- };
-
- $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
- ->setConstructorArgs([$callable])
- ->enableProxyingToOriginalMethods()
- ->getMock();
-
- $decoratedReceiver->expects($this->once())->method('receive');
- if (true === $shouldStop) {
- $decoratedReceiver->expects($this->once())->method('stop');
- } else {
- $decoratedReceiver->expects($this->never())->method('stop');
- }
-
- $memoryResolver = function () use ($memoryUsage) {
- return $memoryUsage;
- };
-
- $memoryLimitReceiver = new StopWhenMemoryUsageIsExceededReceiver($decoratedReceiver, $memoryLimit, null, $memoryResolver);
- $memoryLimitReceiver->receive(function () {});
- }
-
- public function memoryProvider()
- {
- yield [2048, 1024, true];
- yield [1024, 1024, false];
- yield [1024, 2048, false];
- }
-
- public function testReceiverLogsMemoryExceededWhenLoggerIsGiven()
- {
- $callable = function ($handler) {
- $handler(new Envelope(new DummyMessage('API')));
- };
-
- $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
- ->setConstructorArgs([$callable])
- ->enableProxyingToOriginalMethods()
- ->getMock();
-
- $decoratedReceiver->expects($this->once())->method('receive');
- $decoratedReceiver->expects($this->once())->method('stop');
-
- $logger = $this->createMock(LoggerInterface::class);
- $logger->expects($this->once())->method('info')
- ->with('Receiver stopped due to memory limit of {limit} exceeded', ['limit' => 64 * 1024 * 1024]);
-
- $memoryResolver = function () {
- return 70 * 1024 * 1024;
- };
-
- $memoryLimitReceiver = new StopWhenMemoryUsageIsExceededReceiver($decoratedReceiver, 64 * 1024 * 1024, $logger, $memoryResolver);
- $memoryLimitReceiver->receive(function () {});
- }
-}
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenMessageCountIsExceededReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenMessageCountIsExceededReceiverTest.php
deleted file mode 100644
index 1a303728a94e4..0000000000000
--- a/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenMessageCountIsExceededReceiverTest.php
+++ /dev/null
@@ -1,103 +0,0 @@
-
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Symfony\Component\Messenger\Tests\Transport\Receiver;
-
-use PHPUnit\Framework\TestCase;
-use Psr\Log\LoggerInterface;
-use Symfony\Component\Messenger\Envelope;
-use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
-use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
-use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
-
-class StopWhenMessageCountIsExceededReceiverTest extends TestCase
-{
- /**
- * @dataProvider countProvider
- */
- public function testReceiverStopsWhenMaximumCountExceeded($max, $shouldStop)
- {
- $callable = function ($handler) {
- $handler(new Envelope(new DummyMessage('First message')));
- $handler(new Envelope(new DummyMessage('Second message')));
- $handler(new Envelope(new DummyMessage('Third message')));
- };
-
- $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
- ->setConstructorArgs([$callable])
- ->enableProxyingToOriginalMethods()
- ->getMock();
-
- $decoratedReceiver->expects($this->once())->method('receive');
- if (true === $shouldStop) {
- $decoratedReceiver->expects($this->any())->method('stop');
- } else {
- $decoratedReceiver->expects($this->never())->method('stop');
- }
-
- $maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, $max);
- $maximumCountReceiver->receive(function () {});
- }
-
- public function countProvider()
- {
- yield [1, true];
- yield [2, true];
- yield [3, true];
- yield [4, false];
- }
-
- public function testReceiverDoesntIncreaseItsCounterWhenReceiveNullMessage()
- {
- $callable = function ($handler) {
- $handler(null);
- $handler(null);
- $handler(null);
- $handler(null);
- };
-
- $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
- ->setConstructorArgs([$callable])
- ->enableProxyingToOriginalMethods()
- ->getMock();
-
- $decoratedReceiver->expects($this->once())->method('receive');
- $decoratedReceiver->expects($this->never())->method('stop');
-
- $maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, 1);
- $maximumCountReceiver->receive(function () {});
- }
-
- public function testReceiverLogsMaximumCountExceededWhenLoggerIsGiven()
- {
- $callable = function ($handler) {
- $handler(new Envelope(new DummyMessage('First message')));
- };
-
- $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
- ->setConstructorArgs([$callable])
- ->enableProxyingToOriginalMethods()
- ->getMock();
-
- $decoratedReceiver->expects($this->once())->method('receive');
- $decoratedReceiver->expects($this->once())->method('stop');
-
- $logger = $this->createMock(LoggerInterface::class);
- $logger->expects($this->once())->method('info')
- ->with(
- $this->equalTo('Receiver stopped due to maximum count of {count} exceeded'),
- $this->equalTo(['count' => 1])
- );
-
- $maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, 1, $logger);
- $maximumCountReceiver->receive(function () {});
- }
-}
diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenTimeLimitIsReachedReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenTimeLimitIsReachedReceiverTest.php
deleted file mode 100644
index 472703fe6f7f1..0000000000000
--- a/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenTimeLimitIsReachedReceiverTest.php
+++ /dev/null
@@ -1,49 +0,0 @@
-
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Symfony\Component\Messenger\Tests\Transport\Receiver;
-
-use PHPUnit\Framework\TestCase;
-use Psr\Log\LoggerInterface;
-use Symfony\Component\Messenger\Envelope;
-use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
-use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
-use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
-
-class StopWhenTimeLimitIsReachedReceiverTest extends TestCase
-{
- /**
- * @group time-sensitive
- */
- public function testReceiverStopsWhenTimeLimitIsReached()
- {
- $callable = function ($handler) {
- $handler(new Envelope(new DummyMessage('API')));
- };
-
- $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class)
- ->setConstructorArgs([$callable])
- ->enableProxyingToOriginalMethods()
- ->getMock();
-
- $decoratedReceiver->expects($this->once())->method('receive');
- $decoratedReceiver->expects($this->once())->method('stop');
-
- $logger = $this->createMock(LoggerInterface::class);
- $logger->expects($this->once())->method('info')
- ->with('Receiver stopped due to time limit of {timeLimit}s reached', ['timeLimit' => 1]);
-
- $timeoutReceiver = new StopWhenTimeLimitIsReachedReceiver($decoratedReceiver, 1, $logger);
- $timeoutReceiver->receive(function () {
- sleep(2);
- });
- }
-}
diff --git a/src/Symfony/Component/Messenger/Tests/Worker/StopWhenMemoryUsageIsExceededWorkerTest.php b/src/Symfony/Component/Messenger/Tests/Worker/StopWhenMemoryUsageIsExceededWorkerTest.php
new file mode 100644
index 0000000000000..d8a80e37a1468
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/Worker/StopWhenMemoryUsageIsExceededWorkerTest.php
@@ -0,0 +1,71 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\Worker;
+
+use PHPUnit\Framework\TestCase;
+use Psr\Log\LoggerInterface;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker;
+use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
+
+class StopWhenMemoryUsageIsExceededWorkerTest extends TestCase
+{
+ /**
+ * @dataProvider memoryProvider
+ */
+ public function testWorkerStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop)
+ {
+ $handlerCalledTimes = 0;
+ $handledCallback = function () use (&$handlerCalledTimes) {
+ ++$handlerCalledTimes;
+ };
+ $decoratedWorker = new DummyWorker([
+ new Envelope(new \stdClass()),
+ ]);
+
+ $memoryResolver = function () use ($memoryUsage) {
+ return $memoryUsage;
+ };
+
+ $memoryLimitWorker = new StopWhenMemoryUsageIsExceededWorker($decoratedWorker, $memoryLimit, null, $memoryResolver);
+ $memoryLimitWorker->run([], $handledCallback);
+
+ // handler should be called exactly 1 time
+ $this->assertSame($handlerCalledTimes, 1);
+ $this->assertSame($shouldStop, $decoratedWorker->isStopped());
+ }
+
+ public function memoryProvider()
+ {
+ yield [2048, 1024, true];
+ yield [1024, 1024, false];
+ yield [1024, 2048, false];
+ }
+
+ public function testWorkerLogsMemoryExceededWhenLoggerIsGiven()
+ {
+ $decoratedWorker = new DummyWorker([
+ new Envelope(new \stdClass()),
+ ]);
+
+ $logger = $this->createMock(LoggerInterface::class);
+ $logger->expects($this->once())->method('info')
+ ->with('Worker stopped due to memory limit of {limit} exceeded', ['limit' => 64 * 1024 * 1024]);
+
+ $memoryResolver = function () {
+ return 70 * 1024 * 1024;
+ };
+
+ $memoryLimitWorker = new StopWhenMemoryUsageIsExceededWorker($decoratedWorker, 64 * 1024 * 1024, $logger, $memoryResolver);
+ $memoryLimitWorker->run();
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/Worker/StopWhenMessageCountIsExceededWorkerTest.php b/src/Symfony/Component/Messenger/Tests/Worker/StopWhenMessageCountIsExceededWorkerTest.php
new file mode 100644
index 0000000000000..f22d4524bfd37
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/Worker/StopWhenMessageCountIsExceededWorkerTest.php
@@ -0,0 +1,71 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\Worker;
+
+use PHPUnit\Framework\TestCase;
+use Psr\Log\LoggerInterface;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
+use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker;
+use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
+
+class StopWhenMessageCountIsExceededWorkerTest extends TestCase
+{
+ /**
+ * @dataProvider countProvider
+ */
+ public function testWorkerStopsWhenMaximumCountExceeded($max, $shouldStop)
+ {
+ $handlerCalledTimes = 0;
+ $handledCallback = function () use (&$handlerCalledTimes) {
+ ++$handlerCalledTimes;
+ };
+ // receive 3 real messages
+ $decoratedWorker = new DummyWorker([
+ new Envelope(new DummyMessage('First message')),
+ null,
+ new Envelope(new DummyMessage('Second message')),
+ null,
+ new Envelope(new DummyMessage('Third message')),
+ ]);
+
+ $maximumCountWorker = new StopWhenMessageCountIsExceededWorker($decoratedWorker, $max);
+ $maximumCountWorker->run([], $handledCallback);
+
+ $this->assertSame($shouldStop, $decoratedWorker->isStopped());
+ }
+
+ public function countProvider()
+ {
+ yield [1, true];
+ yield [2, true];
+ yield [3, true];
+ yield [4, false];
+ }
+
+ public function testWorkerLogsMaximumCountExceededWhenLoggerIsGiven()
+ {
+ $decoratedWorker = new DummyWorker([
+ new Envelope(new \stdClass()),
+ ]);
+
+ $logger = $this->createMock(LoggerInterface::class);
+ $logger->expects($this->once())->method('info')
+ ->with(
+ $this->equalTo('Worker stopped due to maximum count of {count} exceeded'),
+ $this->equalTo(['count' => 1])
+ );
+
+ $maximumCountWorker = new StopWhenMessageCountIsExceededWorker($decoratedWorker, 1, $logger);
+ $maximumCountWorker->run();
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/Worker/StopWhenTimeLimitIsReachedWorkerTest.php b/src/Symfony/Component/Messenger/Tests/Worker/StopWhenTimeLimitIsReachedWorkerTest.php
new file mode 100644
index 0000000000000..4b06c7392b4a6
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Tests/Worker/StopWhenTimeLimitIsReachedWorkerTest.php
@@ -0,0 +1,44 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Tests\Worker;
+
+use PHPUnit\Framework\TestCase;
+use Psr\Log\LoggerInterface;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker;
+use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
+
+class StopWhenTimeLimitIsReachedWorkerTest extends TestCase
+{
+ /**
+ * @group time-sensitive
+ */
+ public function testWorkerStopsWhenTimeLimitIsReached()
+ {
+ $decoratedWorker = new DummyWorker([
+ new Envelope(new \stdClass()),
+ new Envelope(new \stdClass()),
+ ]);
+
+ $logger = $this->createMock(LoggerInterface::class);
+ $logger->expects($this->once())->method('info')
+ ->with('Worker stopped due to time limit of {timeLimit}s reached', ['timeLimit' => 1]);
+
+ $timeoutWorker = new StopWhenTimeLimitIsReachedWorker($decoratedWorker, 1, $logger);
+ $timeoutWorker->run([], function () {
+ sleep(2);
+ });
+
+ $this->assertTrue($decoratedWorker->isStopped());
+ $this->assertSame(1, $decoratedWorker->countEnvelopesHandled());
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php
index 4ecb7719cbaef..b5c6a6557b2cd 100644
--- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php
+++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php
@@ -22,11 +22,14 @@
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
-use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
+use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Worker;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
+/**
+ * @group time-sensitive
+ */
class WorkerTest extends TestCase
{
public function testWorkerDispatchTheReceivedMessage()
@@ -34,18 +37,22 @@ public function testWorkerDispatchTheReceivedMessage()
$apiMessage = new DummyMessage('API');
$ipaMessage = new DummyMessage('IPA');
- $receiver = new CallbackReceiver(function ($handler) use ($apiMessage, $ipaMessage) {
- $handler(new Envelope($apiMessage));
- $handler(new Envelope($ipaMessage));
- });
+ $receiver = new DummyReceiver([
+ [new Envelope($apiMessage), new Envelope($ipaMessage)],
+ ]);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->with($envelope = new Envelope($apiMessage, new ReceivedStamp()))->willReturn($envelope);
$bus->expects($this->at(1))->method('dispatch')->with($envelope = new Envelope($ipaMessage, new ReceivedStamp()))->willReturn($envelope);
- $worker = new Worker($receiver, $bus, 'receiver_id');
- $worker->run();
+ $worker = new Worker([$receiver], $bus);
+ $worker->run([], function (?Envelope $envelope) use ($worker) {
+ // stop after the messages finish
+ if (null === $envelope) {
+ $worker->stop();
+ }
+ });
$this->assertSame(2, $receiver->getAcknowledgeCount());
}
@@ -53,24 +60,26 @@ public function testWorkerDispatchTheReceivedMessage()
public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage()
{
$envelope = new Envelope(new DummyMessage('API'));
- $receiver = new CallbackReceiver(function ($handler) use ($envelope) {
- $handler($envelope);
- });
+ $receiver = new DummyReceiver([[$envelope]]);
$envelope = $envelope->with(new ReceivedStamp());
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->with($envelope)->willReturn($envelope);
- $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
- $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
- $worker->run();
+ $worker = new Worker([$receiver], $bus, []);
+ $worker->run([], function (?Envelope $envelope) use ($worker) {
+ // stop after the messages finish
+ if (null === $envelope) {
+ $worker->stop();
+ }
+ });
}
public function testDispatchCausesRetry()
{
- $receiver = new CallbackReceiver(function ($handler) {
- $handler(new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias')));
- });
+ $receiver = new DummyReceiver([
+ [new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias'))],
+ ]);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->at(0))->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not'));
@@ -93,8 +102,13 @@ public function testDispatchCausesRetry()
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(true);
- $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
- $worker->run();
+ $worker = new Worker(['receiver1' => $receiver], $bus, ['receiver1' => $retryStrategy]);
+ $worker->run([], function (?Envelope $envelope) use ($worker) {
+ // stop after the messages finish
+ if (null === $envelope) {
+ $worker->stop();
+ }
+ });
// old message acknowledged
$this->assertSame(1, $receiver->getAcknowledgeCount());
@@ -102,9 +116,9 @@ public function testDispatchCausesRetry()
public function testDispatchCausesRejectWhenNoRetry()
{
- $receiver = new CallbackReceiver(function ($handler) {
- $handler(new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias')));
- });
+ $receiver = new DummyReceiver([
+ [new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias'))],
+ ]);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not'));
@@ -112,17 +126,22 @@ public function testDispatchCausesRejectWhenNoRetry()
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(false);
- $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
- $worker->run();
+ $worker = new Worker(['receiver1' => $receiver], $bus, ['receiver1' => $retryStrategy]);
+ $worker->run([], function (?Envelope $envelope) use ($worker) {
+ // stop after the messages finish
+ if (null === $envelope) {
+ $worker->stop();
+ }
+ });
$this->assertSame(1, $receiver->getRejectCount());
$this->assertSame(0, $receiver->getAcknowledgeCount());
}
public function testDispatchCausesRejectOnUnrecoverableMessage()
{
- $receiver = new CallbackReceiver(function ($handler) {
- $handler(new Envelope(new DummyMessage('Hello')));
- });
+ $receiver = new DummyReceiver([
+ [new Envelope(new DummyMessage('Hello'))],
+ ]);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->method('dispatch')->willThrowException(new UnrecoverableMessageHandlingException('Will never work'));
@@ -130,36 +149,42 @@ public function testDispatchCausesRejectOnUnrecoverableMessage()
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$retryStrategy->expects($this->never())->method('isRetryable');
- $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
- $worker->run();
+ $worker = new Worker(['receiver1' => $receiver], $bus, ['receiver1' => $retryStrategy]);
+ $worker->run([], function (?Envelope $envelope) use ($worker) {
+ // stop after the messages finish
+ if (null === $envelope) {
+ $worker->stop();
+ }
+ });
$this->assertSame(1, $receiver->getRejectCount());
}
public function testWorkerDoesNotSendNullMessagesToTheBus()
{
- $receiver = new CallbackReceiver(function ($handler) {
- $handler(null);
- });
+ $receiver = new DummyReceiver([
+ null,
+ ]);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->expects($this->never())->method('dispatch');
- $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
- $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
- $worker->run();
+ $worker = new Worker([$receiver], $bus);
+ $worker->run([], function (?Envelope $envelope) use ($worker) {
+ // stop after the messages finish
+ if (null === $envelope) {
+ $worker->stop();
+ }
+ });
}
public function testWorkerDispatchesEventsOnSuccess()
{
$envelope = new Envelope(new DummyMessage('Hello'));
- $receiver = new CallbackReceiver(function ($handler) use ($envelope) {
- $handler($envelope);
- });
+ $receiver = new DummyReceiver([[$envelope]]);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->method('dispatch')->willReturn($envelope);
- $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)->getMock();
$eventDispatcher->expects($this->exactly(2))
@@ -169,22 +194,24 @@ public function testWorkerDispatchesEventsOnSuccess()
[$this->isInstanceOf(WorkerMessageHandledEvent::class)]
);
- $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy, $eventDispatcher);
- $worker->run();
+ $worker = new Worker([$receiver], $bus, [], $eventDispatcher);
+ $worker->run([], function (?Envelope $envelope) use ($worker) {
+ // stop after the messages finish
+ if (null === $envelope) {
+ $worker->stop();
+ }
+ });
}
public function testWorkerDispatchesEventsOnError()
{
$envelope = new Envelope(new DummyMessage('Hello'));
- $receiver = new CallbackReceiver(function ($handler) use ($envelope) {
- $handler($envelope);
- });
+ $receiver = new DummyReceiver([[$envelope]]);
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$exception = new \InvalidArgumentException('Oh no!');
$bus->method('dispatch')->willThrowException($exception);
- $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)->getMock();
$eventDispatcher->expects($this->exactly(2))
@@ -194,7 +221,143 @@ public function testWorkerDispatchesEventsOnError()
[$this->isInstanceOf(WorkerMessageFailedEvent::class)]
);
- $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy, $eventDispatcher);
- $worker->run();
+ $worker = new Worker([$receiver], $bus, [], $eventDispatcher);
+ $worker->run([], function (?Envelope $envelope) use ($worker) {
+ // stop after the messages finish
+ if (null === $envelope) {
+ $worker->stop();
+ }
+ });
+ }
+
+ public function testTimeoutIsConfigurable()
+ {
+ $apiMessage = new DummyMessage('API');
+ $receiver = new DummyReceiver([
+ [new Envelope($apiMessage), new Envelope($apiMessage)],
+ [], // will cause a wait
+ [], // will cause a wait
+ [new Envelope($apiMessage)],
+ [new Envelope($apiMessage)],
+ [], // will cause a wait
+ [new Envelope($apiMessage)],
+ ]);
+
+ $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
+
+ $worker = new Worker([$receiver], $bus);
+ $receivedCount = 0;
+ $startTime = microtime(true);
+ // sleep .1 after each idle
+ $worker->run(['sleep' => 100000], function (?Envelope $envelope) use ($worker, &$receivedCount, $startTime) {
+ if (null !== $envelope) {
+ ++$receivedCount;
+ }
+
+ if (5 === $receivedCount) {
+ $worker->stop();
+ $duration = microtime(true) - $startTime;
+
+ // wait time should be .3 seconds
+ // use .29 & .31 for timing "wiggle room"
+ $this->assertGreaterThanOrEqual(.29, $duration);
+ $this->assertLessThan(.31, $duration);
+ }
+ });
+ }
+
+ public function testWorkerWithMultipleReceivers()
+ {
+ // envelopes, in their expected delivery order
+ $envelope1 = new Envelope(new DummyMessage('message1'));
+ $envelope2 = new Envelope(new DummyMessage('message2'));
+ $envelope3 = new Envelope(new DummyMessage('message3'));
+ $envelope4 = new Envelope(new DummyMessage('message4'));
+ $envelope5 = new Envelope(new DummyMessage('message5'));
+ $envelope6 = new Envelope(new DummyMessage('message6'));
+
+ /*
+ * Round 1) receiver 1 & 2 have nothing, receiver 3 processes envelope1 and envelope2
+ * Round 2) receiver 1 has nothing, receiver 2 processes envelope3, receiver 3 is not called
+ * Round 3) receiver 1 processes envelope 4, receivers 2 & 3 are not called
+ * Round 4) receiver 1 processes envelope 5, receivers 2 & 3 are not called
+ * Round 5) receiver 1 has nothing, receiver 2 has nothing, receiver 3 has envelope 6
+ */
+ $receiver1 = new DummyReceiver([
+ [],
+ [],
+ [$envelope4],
+ [$envelope5],
+ [],
+ ]);
+ $receiver2 = new DummyReceiver([
+ [],
+ [$envelope3],
+ [],
+ ]);
+ $receiver3 = new DummyReceiver([
+ [$envelope1, $envelope2],
+ [],
+ [$envelope6],
+ ]);
+
+ $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
+
+ $receivedCount = 0;
+ $worker = new Worker([$receiver1, $receiver2, $receiver3], $bus);
+ $processedEnvelopes = [];
+ $worker->run([], function (?Envelope $envelope) use ($worker, &$receivedCount, &$processedEnvelopes) {
+ if (null !== $envelope) {
+ $processedEnvelopes[] = $envelope;
+ ++$receivedCount;
+ }
+
+ // stop after the messages finish
+ if (6 === $receivedCount) {
+ $worker->stop();
+ }
+ });
+
+ // make sure they were processed in the correct order
+ $this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes);
+ }
+}
+
+class DummyReceiver implements ReceiverInterface
+{
+ private $deliveriesOfEnvelopes;
+ private $acknowledgeCount = 0;
+ private $rejectCount = 0;
+
+ public function __construct(array $deliveriesOfEnvelopes)
+ {
+ $this->deliveriesOfEnvelopes = $deliveriesOfEnvelopes;
+ }
+
+ public function get(): iterable
+ {
+ $val = array_shift($this->deliveriesOfEnvelopes);
+
+ return null === $val ? [] : $val;
+ }
+
+ public function ack(Envelope $envelope): void
+ {
+ ++$this->acknowledgeCount;
+ }
+
+ public function reject(Envelope $envelope): void
+ {
+ ++$this->rejectCount;
+ }
+
+ public function getAcknowledgeCount(): int
+ {
+ return $this->acknowledgeCount;
+ }
+
+ public function getRejectCount(): int
+ {
+ return $this->rejectCount;
}
}
diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php
index 93afbaff5d8b8..ce1918e1da0c1 100644
--- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php
+++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php
@@ -30,7 +30,6 @@ class AmqpReceiver implements ReceiverInterface
{
private $serializer;
private $connection;
- private $shouldStop;
public function __construct(Connection $connection, SerializerInterface $serializer = null)
{
@@ -41,38 +40,31 @@ public function __construct(Connection $connection, SerializerInterface $seriali
/**
* {@inheritdoc}
*/
- public function receive(callable $handler): void
+ public function get(): iterable
{
- while (!$this->shouldStop) {
- try {
- $amqpEnvelope = $this->connection->get();
- } catch (\AMQPException $exception) {
- throw new TransportException($exception->getMessage(), 0, $exception);
- }
-
- if (null === $amqpEnvelope) {
- $handler(null);
-
- usleep($this->connection->getConnectionConfiguration()['loop_sleep'] ?? 200000);
-
- continue;
- }
-
- try {
- $envelope = $this->serializer->decode([
- 'body' => $amqpEnvelope->getBody(),
- 'headers' => $amqpEnvelope->getHeaders(),
- ]);
- } catch (MessageDecodingFailedException $exception) {
- // invalid message of some type
- $this->rejectAmqpEnvelope($amqpEnvelope);
-
- throw $exception;
- }
-
- $envelope = $envelope->with(new AmqpReceivedStamp($amqpEnvelope));
- $handler($envelope);
+ try {
+ $amqpEnvelope = $this->connection->get();
+ } catch (\AMQPException $exception) {
+ throw new TransportException($exception->getMessage(), 0, $exception);
+ }
+
+ if (null === $amqpEnvelope) {
+ return [];
+ }
+
+ try {
+ $envelope = $this->serializer->decode([
+ 'body' => $amqpEnvelope->getBody(),
+ 'headers' => $amqpEnvelope->getHeaders(),
+ ]);
+ } catch (MessageDecodingFailedException $exception) {
+ // invalid message of some type
+ $this->rejectAmqpEnvelope($amqpEnvelope);
+
+ throw $exception;
}
+
+ yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope));
}
public function ack(Envelope $envelope): void
@@ -89,11 +81,6 @@ public function reject(Envelope $envelope): void
$this->rejectAmqpEnvelope($this->findAmqpEnvelope($envelope));
}
- public function stop(): void
- {
- $this->shouldStop = true;
- }
-
private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope): void
{
try {
diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php
index 2146bd900d29c..586e16c4d86dd 100644
--- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php
+++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php
@@ -38,17 +38,9 @@ public function __construct(Connection $connection, SerializerInterface $seriali
/**
* {@inheritdoc}
*/
- public function receive(callable $handler): void
+ public function get(): iterable
{
- ($this->receiver ?? $this->getReceiver())->receive($handler);
- }
-
- /**
- * {@inheritdoc}
- */
- public function stop(): void
- {
- ($this->receiver ?? $this->getReceiver())->stop();
+ return ($this->receiver ?? $this->getReceiver())->get();
}
/**
diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php b/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php
index 29da741a11f34..d974ff00b7902 100644
--- a/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php
+++ b/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php
@@ -25,21 +25,25 @@ interface ReceiverInterface
/**
* Receive some messages to the given handler.
*
- * The handler will have, as argument, the received {@link \Symfony\Component\Messenger\Envelope} containing the message.
- * Note that this envelope can be `null` if the timeout to receive something has expired.
+ * While this method could return an unlimited number of messages,
+ * the intention is that it returns only one, or a "small number"
+ * of messages each time. This gives the user more flexibility:
+ * they can finish processing the one (or "small number") of messages
+ * from this receiver and move on to check other receivers for messages.
+ * If a this method returns too many messages, it could cause a
+ * blocking effect where handling the messages received from one
+ * call to get() takes a long time, blocking other receivers from
+ * being called.
*
- * If the received message cannot be decoded, the message should not
+ * If a received message cannot be decoded, the message should not
* be retried again (e.g. if there's a queue, it should be removed)
* and a MessageDecodingFailedException should be thrown.
*
* @throws TransportException If there is an issue communicating with the transport
+ *
+ * @return Envelope[]
*/
- public function receive(callable $handler): void;
-
- /**
- * Stop receiving some messages.
- */
- public function stop(): void;
+ public function get(): iterable;
/**
* Acknowledge that the passed message was handled.
diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiver.php b/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiver.php
deleted file mode 100644
index 09af4673b87b4..0000000000000
--- a/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiver.php
+++ /dev/null
@@ -1,68 +0,0 @@
-
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Symfony\Component\Messenger\Transport\Receiver;
-
-use Psr\Log\LoggerInterface;
-use Symfony\Component\Messenger\Envelope;
-
-/**
- * @author Simon Delicata
- *
- * @experimental in 4.2
- */
-class StopWhenMemoryUsageIsExceededReceiver implements ReceiverInterface
-{
- private $decoratedReceiver;
- private $memoryLimit;
- private $logger;
- private $memoryResolver;
-
- public function __construct(ReceiverInterface $decoratedReceiver, int $memoryLimit, LoggerInterface $logger = null, callable $memoryResolver = null)
- {
- $this->decoratedReceiver = $decoratedReceiver;
- $this->memoryLimit = $memoryLimit;
- $this->logger = $logger;
- $this->memoryResolver = $memoryResolver ?: function () {
- return \memory_get_usage();
- };
- }
-
- public function receive(callable $handler): void
- {
- $this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler) {
- $handler($envelope);
-
- $memoryResolver = $this->memoryResolver;
- if ($memoryResolver() > $this->memoryLimit) {
- $this->stop();
- if (null !== $this->logger) {
- $this->logger->info('Receiver stopped due to memory limit of {limit} exceeded', ['limit' => $this->memoryLimit]);
- }
- }
- });
- }
-
- public function stop(): void
- {
- $this->decoratedReceiver->stop();
- }
-
- public function ack(Envelope $envelope): void
- {
- $this->decoratedReceiver->ack($envelope);
- }
-
- public function reject(Envelope $envelope): void
- {
- $this->decoratedReceiver->reject($envelope);
- }
-}
diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMessageCountIsExceededReceiver.php b/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMessageCountIsExceededReceiver.php
deleted file mode 100644
index 8be38d157e8f1..0000000000000
--- a/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMessageCountIsExceededReceiver.php
+++ /dev/null
@@ -1,65 +0,0 @@
-
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Symfony\Component\Messenger\Transport\Receiver;
-
-use Psr\Log\LoggerInterface;
-use Symfony\Component\Messenger\Envelope;
-
-/**
- * @author Samuel Roze
- *
- * @experimental in 4.2
- */
-class StopWhenMessageCountIsExceededReceiver implements ReceiverInterface
-{
- private $decoratedReceiver;
- private $maximumNumberOfMessages;
- private $logger;
-
- public function __construct(ReceiverInterface $decoratedReceiver, int $maximumNumberOfMessages, LoggerInterface $logger = null)
- {
- $this->decoratedReceiver = $decoratedReceiver;
- $this->maximumNumberOfMessages = $maximumNumberOfMessages;
- $this->logger = $logger;
- }
-
- public function receive(callable $handler): void
- {
- $receivedMessages = 0;
-
- $this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler, &$receivedMessages) {
- $handler($envelope);
-
- if (null !== $envelope && ++$receivedMessages >= $this->maximumNumberOfMessages) {
- $this->stop();
- if (null !== $this->logger) {
- $this->logger->info('Receiver stopped due to maximum count of {count} exceeded', ['count' => $this->maximumNumberOfMessages]);
- }
- }
- });
- }
-
- public function stop(): void
- {
- $this->decoratedReceiver->stop();
- }
-
- public function ack(Envelope $envelope): void
- {
- $this->decoratedReceiver->ack($envelope);
- }
-
- public function reject(Envelope $envelope): void
- {
- $this->decoratedReceiver->reject($envelope);
- }
-}
diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenTimeLimitIsReachedReceiver.php b/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenTimeLimitIsReachedReceiver.php
deleted file mode 100644
index ade088b7dabb1..0000000000000
--- a/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenTimeLimitIsReachedReceiver.php
+++ /dev/null
@@ -1,66 +0,0 @@
-
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Symfony\Component\Messenger\Transport\Receiver;
-
-use Psr\Log\LoggerInterface;
-use Symfony\Component\Messenger\Envelope;
-
-/**
- * @author Simon Delicata
- *
- * @experimental in 4.2
- */
-class StopWhenTimeLimitIsReachedReceiver implements ReceiverInterface
-{
- private $decoratedReceiver;
- private $timeLimitInSeconds;
- private $logger;
-
- public function __construct(ReceiverInterface $decoratedReceiver, int $timeLimitInSeconds, LoggerInterface $logger = null)
- {
- $this->decoratedReceiver = $decoratedReceiver;
- $this->timeLimitInSeconds = $timeLimitInSeconds;
- $this->logger = $logger;
- }
-
- public function receive(callable $handler): void
- {
- $startTime = microtime(true);
- $endTime = $startTime + $this->timeLimitInSeconds;
-
- $this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler, $endTime) {
- $handler($envelope);
-
- if ($endTime < microtime(true)) {
- $this->stop();
- if (null !== $this->logger) {
- $this->logger->info('Receiver stopped due to time limit of {timeLimit}s reached', ['timeLimit' => $this->timeLimitInSeconds]);
- }
- }
- });
- }
-
- public function stop(): void
- {
- $this->decoratedReceiver->stop();
- }
-
- public function ack(Envelope $envelope): void
- {
- $this->decoratedReceiver->ack($envelope);
- }
-
- public function reject(Envelope $envelope): void
- {
- $this->decoratedReceiver->reject($envelope);
- }
-}
diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php
index a273b39da95a7..1e7f539f6ce40 100644
--- a/src/Symfony/Component/Messenger/Worker.php
+++ b/src/Symfony/Component/Messenger/Worker.php
@@ -32,112 +32,144 @@
*
* @final
*/
-class Worker
+class Worker implements WorkerInterface
{
- private $receiver;
+ private $receivers;
private $bus;
- private $receiverName;
- private $retryStrategy;
+ private $retryStrategies;
private $eventDispatcher;
private $logger;
+ private $shouldStop = false;
- public function __construct(ReceiverInterface $receiver, MessageBusInterface $bus, string $receiverName = null, RetryStrategyInterface $retryStrategy = null, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
+ /**
+ * @param ReceiverInterface[] $receivers Where the key will be used as the string "identifier"
+ * @param RetryStrategyInterface[] $retryStrategies Retry strategies for each receiver (array keys must match)
+ */
+ public function __construct(array $receivers, MessageBusInterface $bus, $retryStrategies = [], EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
{
- $this->receiver = $receiver;
+ $this->receivers = $receivers;
$this->bus = $bus;
- if (null === $receiverName) {
- @trigger_error(sprintf('Instantiating the "%s" class without passing a third argument is deprecated since Symfony 4.3.', __CLASS__), E_USER_DEPRECATED);
-
- $receiverName = 'unknown';
- }
- $this->receiverName = $receiverName;
- $this->retryStrategy = $retryStrategy;
+ $this->retryStrategies = $retryStrategies;
$this->eventDispatcher = $eventDispatcher;
$this->logger = $logger;
}
/**
* Receive the messages and dispatch them to the bus.
+ *
+ * Valid options are:
+ * * sleep (default: 1000000): Time in microseconds to sleep after no messages are found
*/
- public function run()
+ public function run(array $options = [], callable $onHandledCallback = null): void
{
+ $options = array_merge([
+ 'sleep' => 1000000,
+ ], $options);
+
if (\function_exists('pcntl_signal')) {
pcntl_signal(SIGTERM, function () {
- $this->receiver->stop();
+ $this->stop();
});
}
- $this->receiver->receive(function (?Envelope $envelope) {
- if (null === $envelope) {
- if (\function_exists('pcntl_signal_dispatch')) {
- pcntl_signal_dispatch();
- }
+ $onHandled = function (?Envelope $envelope) use ($onHandledCallback) {
+ if (\function_exists('pcntl_signal_dispatch')) {
+ pcntl_signal_dispatch();
+ }
- return;
+ if (null !== $onHandledCallback) {
+ $onHandledCallback($envelope);
}
+ };
- $this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $this->receiverName));
-
- $message = $envelope->getMessage();
- $context = [
- 'message' => $message,
- 'class' => \get_class($message),
- ];
-
- try {
- $envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp()));
- } catch (\Throwable $throwable) {
- $shouldRetry = $this->shouldRetry($throwable, $envelope);
-
- $this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $this->receiverName, $throwable, $shouldRetry));
-
- if ($shouldRetry) {
- if (null === $this->retryStrategy) {
- // not logically allowed, but check just in case
- throw new LogicException('Retrying is not supported without a retry strategy.');
- }
-
- $retryCount = $this->getRetryCount($envelope) + 1;
- if (null !== $this->logger) {
- $this->logger->error('Retrying {class} - retry #{retryCount}.', $context + ['retryCount' => $retryCount, 'error' => $throwable]);
- }
-
- // add the delay and retry stamp info + remove ReceivedStamp
- $retryEnvelope = $envelope->with(new DelayStamp($this->retryStrategy->getWaitingTime($envelope)))
- ->with(new RedeliveryStamp($retryCount, $this->getSenderAlias($envelope)))
- ->withoutAll(ReceivedStamp::class);
-
- // re-send the message
- $this->bus->dispatch($retryEnvelope);
- // acknowledge the previous message has received
- $this->receiver->ack($envelope);
- } else {
- if (null !== $this->logger) {
- $this->logger->critical('Rejecting {class} (removing from transport).', $context + ['error' => $throwable]);
- }
-
- $this->receiver->reject($envelope);
- }
+ while (false === $this->shouldStop) {
+ $envelopeHandled = false;
+ foreach ($this->receivers as $receiverName => $receiver) {
+ $envelopes = $receiver->get();
+
+ foreach ($envelopes as $envelope) {
+ $envelopeHandled = true;
- if (\function_exists('pcntl_signal_dispatch')) {
- pcntl_signal_dispatch();
+ $this->handleMessage($envelope, $receiver, $receiverName, $this->retryStrategies[$receiverName] ?? null);
+ $onHandled($envelope);
}
- return;
+ // after handling a single receiver, quit and start the loop again
+ // this should prevent multiple lower priority receivers from
+ // blocking too long before the higher priority are checked
+ if ($envelopeHandled) {
+ break;
+ }
}
- $this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $this->receiverName));
+ if (false === $envelopeHandled) {
+ $onHandled(null);
- if (null !== $this->logger) {
- $this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
+ usleep($options['sleep']);
}
+ }
+ }
- $this->receiver->ack($envelope);
+ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $receiverName, ?RetryStrategyInterface $retryStrategy)
+ {
+ $this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $receiverName));
- if (\function_exists('pcntl_signal_dispatch')) {
- pcntl_signal_dispatch();
+ $message = $envelope->getMessage();
+ $context = [
+ 'message' => $message,
+ 'class' => \get_class($message),
+ ];
+
+ try {
+ $envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp()));
+ } catch (\Throwable $throwable) {
+ $shouldRetry = $this->shouldRetry($throwable, $envelope, $retryStrategy);
+
+ $this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $receiverName, $throwable, $shouldRetry));
+
+ if ($shouldRetry) {
+ if (null === $retryStrategy) {
+ // not logically allowed, but check just in case
+ throw new LogicException('Retrying is not supported without a retry strategy.');
+ }
+
+ $retryCount = $this->getRetryCount($envelope) + 1;
+ if (null !== $this->logger) {
+ $this->logger->error('Retrying {class} - retry #{retryCount}.', $context + ['retryCount' => $retryCount, 'error' => $throwable]);
+ }
+
+ // add the delay and retry stamp info + remove ReceivedStamp
+ $retryEnvelope = $envelope->with(new DelayStamp($retryStrategy->getWaitingTime($envelope)))
+ ->with(new RedeliveryStamp($retryCount, $this->getSenderAlias($envelope)))
+ ->withoutAll(ReceivedStamp::class);
+
+ // re-send the message
+ $this->bus->dispatch($retryEnvelope);
+ // acknowledge the previous message has received
+ $receiver->ack($envelope);
+ } else {
+ if (null !== $this->logger) {
+ $this->logger->critical('Rejecting {class} (removing from transport).', $context + ['error' => $throwable]);
+ }
+
+ $receiver->reject($envelope);
}
- });
+
+ return;
+ }
+
+ $this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $receiverName));
+
+ if (null !== $this->logger) {
+ $this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
+ }
+
+ $receiver->ack($envelope);
+ }
+
+ public function stop(): void
+ {
+ $this->shouldStop = true;
}
private function dispatchEvent($event)
@@ -149,17 +181,17 @@ private function dispatchEvent($event)
$this->eventDispatcher->dispatch($event);
}
- private function shouldRetry(\Throwable $e, Envelope $envelope): bool
+ private function shouldRetry(\Throwable $e, Envelope $envelope, ?RetryStrategyInterface $retryStrategy): bool
{
if ($e instanceof UnrecoverableMessageHandlingException) {
return false;
}
- if (null === $this->retryStrategy) {
+ if (null === $retryStrategy) {
return false;
}
- return $this->retryStrategy->isRetryable($envelope);
+ return $retryStrategy->isRetryable($envelope);
}
private function getRetryCount(Envelope $envelope): int
diff --git a/src/Symfony/Component/Messenger/Worker/StopWhenMemoryUsageIsExceededWorker.php b/src/Symfony/Component/Messenger/Worker/StopWhenMemoryUsageIsExceededWorker.php
new file mode 100644
index 0000000000000..77e848898234d
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Worker/StopWhenMemoryUsageIsExceededWorker.php
@@ -0,0 +1,61 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Worker;
+
+use Psr\Log\LoggerInterface;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\WorkerInterface;
+
+/**
+ * @author Simon Delicata
+ *
+ * @experimental in 4.3
+ */
+class StopWhenMemoryUsageIsExceededWorker implements WorkerInterface
+{
+ private $decoratedWorker;
+ private $memoryLimit;
+ private $logger;
+ private $memoryResolver;
+
+ public function __construct(WorkerInterface $decoratedWorker, int $memoryLimit, LoggerInterface $logger = null, callable $memoryResolver = null)
+ {
+ $this->decoratedWorker = $decoratedWorker;
+ $this->memoryLimit = $memoryLimit;
+ $this->logger = $logger;
+ $this->memoryResolver = $memoryResolver ?: function () {
+ return \memory_get_usage();
+ };
+ }
+
+ public function run(array $options = [], callable $onHandledCallback = null): void
+ {
+ $this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback) {
+ if (null !== $onHandledCallback) {
+ $onHandledCallback($envelope);
+ }
+
+ $memoryResolver = $this->memoryResolver;
+ if ($memoryResolver() > $this->memoryLimit) {
+ $this->stop();
+ if (null !== $this->logger) {
+ $this->logger->info('Worker stopped due to memory limit of {limit} exceeded', ['limit' => $this->memoryLimit]);
+ }
+ }
+ });
+ }
+
+ public function stop(): void
+ {
+ $this->decoratedWorker->stop();
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Worker/StopWhenMessageCountIsExceededWorker.php b/src/Symfony/Component/Messenger/Worker/StopWhenMessageCountIsExceededWorker.php
new file mode 100644
index 0000000000000..c72e2cc954a78
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Worker/StopWhenMessageCountIsExceededWorker.php
@@ -0,0 +1,58 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Worker;
+
+use Psr\Log\LoggerInterface;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\WorkerInterface;
+
+/**
+ * @author Samuel Roze
+ *
+ * @experimental in 4.3
+ */
+class StopWhenMessageCountIsExceededWorker implements WorkerInterface
+{
+ private $decoratedWorker;
+ private $maximumNumberOfMessages;
+ private $logger;
+
+ public function __construct(WorkerInterface $decoratedWorker, int $maximumNumberOfMessages, LoggerInterface $logger = null)
+ {
+ $this->decoratedWorker = $decoratedWorker;
+ $this->maximumNumberOfMessages = $maximumNumberOfMessages;
+ $this->logger = $logger;
+ }
+
+ public function run(array $options = [], callable $onHandledCallback = null): void
+ {
+ $receivedMessages = 0;
+
+ $this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, &$receivedMessages) {
+ if (null !== $onHandledCallback) {
+ $onHandledCallback($envelope);
+ }
+
+ if (null !== $envelope && ++$receivedMessages >= $this->maximumNumberOfMessages) {
+ $this->stop();
+ if (null !== $this->logger) {
+ $this->logger->info('Worker stopped due to maximum count of {count} exceeded', ['count' => $this->maximumNumberOfMessages]);
+ }
+ }
+ });
+ }
+
+ public function stop(): void
+ {
+ $this->decoratedWorker->stop();
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Worker/StopWhenTimeLimitIsReachedWorker.php b/src/Symfony/Component/Messenger/Worker/StopWhenTimeLimitIsReachedWorker.php
new file mode 100644
index 0000000000000..3a4dcf859d859
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Worker/StopWhenTimeLimitIsReachedWorker.php
@@ -0,0 +1,59 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Worker;
+
+use Psr\Log\LoggerInterface;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\WorkerInterface;
+
+/**
+ * @author Simon Delicata
+ *
+ * @experimental in 4.3
+ */
+class StopWhenTimeLimitIsReachedWorker implements WorkerInterface
+{
+ private $decoratedWorker;
+ private $timeLimitInSeconds;
+ private $logger;
+
+ public function __construct(WorkerInterface $decoratedWorker, int $timeLimitInSeconds, LoggerInterface $logger = null)
+ {
+ $this->decoratedWorker = $decoratedWorker;
+ $this->timeLimitInSeconds = $timeLimitInSeconds;
+ $this->logger = $logger;
+ }
+
+ public function run(array $options = [], callable $onHandledCallback = null): void
+ {
+ $startTime = microtime(true);
+ $endTime = $startTime + $this->timeLimitInSeconds;
+
+ $this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, $endTime) {
+ if (null !== $onHandledCallback) {
+ $onHandledCallback($envelope);
+ }
+
+ if ($endTime < microtime(true)) {
+ $this->stop();
+ if (null !== $this->logger) {
+ $this->logger->info('Worker stopped due to time limit of {timeLimit}s reached', ['timeLimit' => $this->timeLimitInSeconds]);
+ }
+ }
+ });
+ }
+
+ public function stop(): void
+ {
+ $this->decoratedWorker->stop();
+ }
+}
diff --git a/src/Symfony/Component/Messenger/WorkerInterface.php b/src/Symfony/Component/Messenger/WorkerInterface.php
new file mode 100644
index 0000000000000..a4b3566871cb2
--- /dev/null
+++ b/src/Symfony/Component/Messenger/WorkerInterface.php
@@ -0,0 +1,37 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger;
+
+/**
+ * Interface for Workers that handle messages from transports.
+ *
+ * @experimental in 4.3
+ *
+ * @author Ryan Weaver
+ */
+interface WorkerInterface
+{
+ /**
+ * Receive the messages and dispatch them to the bus.
+ *
+ * The $onHandledCallback will be passed the Envelope that was just
+ * handled or null if nothing was handled.
+ *
+ * @param mixed[] $options options used to control worker behavior
+ */
+ public function run(array $options = [], callable $onHandledCallback = null): void;
+
+ /**
+ * Stop receiving messages.
+ */
+ public function stop(): void;
+}