diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 9638de4e49092..54b22a321f086 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -11,8 +11,9 @@ CHANGELOG to the `Envelope` then find the correct bus when receiving from the transport. See `ConsumeMessagesCommand`. * The optional `$busNames` constructor argument of the class `ConsumeMessagesCommand` was removed. - * [BC BREAK] 2 new methods were added to `ReceiverInterface`: - `ack()` and `reject()`. + * [BC BREAK] 3 new methods were added to `ReceiverInterface`: + `ack()`, `reject()` and `get()`. The methods `receive()` + and `stop()` were removed. * [BC BREAK] Error handling was moved from the receivers into `Worker`. Implementations of `ReceiverInterface::handle()` should now allow all exceptions to be thrown, except for transport @@ -24,7 +25,9 @@ CHANGELOG * The default command name for `ConsumeMessagesCommand` was changed from `messenger:consume-messages` to `messenger:consume` * `ConsumeMessagesCommand` has two new optional constructor arguments - * `Worker` has 4 new option constructor arguments. + * [BC BREAK] The first argument to Worker changed from a single + `ReceiverInterface` to an array of `ReceiverInterface`. + * `Worker` has 3 new optional constructor arguments. * The `Worker` class now handles calling `pcntl_signal_dispatch()` the receiver no longer needs to call this. * The `AmqpSender` will now retry messages using a dead-letter exchange diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index e49b28705c876..bd3a7a177c6c3 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -19,12 +19,13 @@ use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\Console\Question\ChoiceQuestion; use Symfony\Component\Console\Style\SymfonyStyle; use Symfony\Component\Messenger\RoutableMessageBus; -use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver; -use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver; -use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver; use Symfony\Component\Messenger\Worker; +use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker; +use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker; +use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker; use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; /** @@ -70,10 +71,11 @@ protected function configure(): void $this ->setDefinition([ - new InputArgument('receiver', $defaultReceiverName ? InputArgument::OPTIONAL : InputArgument::REQUIRED, 'Name of the receiver', $defaultReceiverName), + new InputArgument('receivers', InputArgument::IS_ARRAY, 'Names of the receivers/transports to consume in order of priority', $defaultReceiverName ? [$defaultReceiverName] : []), new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'), new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'), new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'), + new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1), new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically.'), ]) ->setDescription('Consumes messages') @@ -82,6 +84,10 @@ protected function configure(): void php %command.full_name% +To receive from multiple transports, pass each name: + + php %command.full_name% receiver1 receiver2 + Use the --limit option to limit the number of messages received: php %command.full_name% --limit=10 @@ -111,16 +117,22 @@ protected function interact(InputInterface $input, OutputInterface $output) { $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output); - if ($this->receiverNames && !$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) { - if (null === $receiverName) { - $io->block('Missing receiver argument.', null, 'error', ' ', true); - $input->setArgument('receiver', $io->choice('Select one of the available receivers', $this->receiverNames)); - } elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) { - $io->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true); - if ($io->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) { - $input->setArgument('receiver', $alternatives[0]); - } + if ($this->receiverNames && 0 === \count($input->getArgument('receivers'))) { + $io->block('Which transports/receivers do you want to consume?', null, 'fg=white;bg=blue', ' ', true); + + $io->writeln('Choose which receivers you want to consume messages from in order of priority.'); + if (\count($this->receiverNames) > 1) { + $io->writeln(sprintf('Hint: to consume from multiple, use a list of their names, e.g. %s', implode(', ', $this->receiverNames))); } + + $question = new ChoiceQuestion('Select receivers to consume:', $this->receiverNames, 0); + $question->setMultiselect(true); + + $input->setArgument('receivers', $io->askQuestion($question)); + } + + if (0 === \count($input->getArgument('receivers'))) { + throw new RuntimeException('Please pass at least one receiver.'); } } @@ -135,16 +147,25 @@ protected function execute(InputInterface $input, OutputInterface $output): void $output->writeln(sprintf('%s', $message)); } - if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) { - throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName)); - } + $receivers = []; + $retryStrategies = []; + foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) { + if (!$this->receiverLocator->has($receiverName)) { + $message = sprintf('The receiver "%s" does not exist.', $receiverName); + if ($this->receiverNames) { + $message .= sprintf(' Valid receivers are: %s.', implode(', ', $this->receiverNames)); + } - if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) { - throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName)); - } + throw new RuntimeException($message); + } - $receiver = $this->receiverLocator->get($receiverName); - $retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null; + if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) { + throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName)); + } + + $receivers[$receiverName] = $this->receiverLocator->get($receiverName); + $retryStrategies[$receiverName] = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null; + } if (null !== $input->getOption('bus')) { $bus = $this->busLocator->get($input->getOption('bus')); @@ -152,24 +173,25 @@ protected function execute(InputInterface $input, OutputInterface $output): void $bus = new RoutableMessageBus($this->busLocator); } + $worker = new Worker($receivers, $bus, $retryStrategies, $this->eventDispatcher, $this->logger); $stopsWhen = []; if ($limit = $input->getOption('limit')) { $stopsWhen[] = "processed {$limit} messages"; - $receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger); + $worker = new StopWhenMessageCountIsExceededWorker($worker, $limit, $this->logger); } if ($memoryLimit = $input->getOption('memory-limit')) { $stopsWhen[] = "exceeded {$memoryLimit} of memory"; - $receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger); + $worker = new StopWhenMemoryUsageIsExceededWorker($worker, $this->convertToBytes($memoryLimit), $this->logger); } if ($timeLimit = $input->getOption('time-limit')) { $stopsWhen[] = "been running for {$timeLimit}s"; - $receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger); + $worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger); } $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output); - $io->success(sprintf('Consuming messages from transport "%s".', $receiverName)); + $io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames))); if ($stopsWhen) { $last = array_pop($stopsWhen); @@ -183,8 +205,9 @@ protected function execute(InputInterface $input, OutputInterface $output): void $io->comment('Re-run the command with a -vv option to see logs about consumed messages.'); } - $worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger); - $worker->run(); + $worker->run([ + 'sleep' => $input->getOption('sleep') * 1000000, + ]); } private function convertToBytes(string $memoryLimit): int diff --git a/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php index 336cd8129cf4c..e7ce90b85c0bf 100644 --- a/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php +++ b/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php @@ -20,16 +20,8 @@ class ConsumeMessagesCommandTest extends TestCase public function testConfigurationWithDefaultReceiver() { $command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp']); - $inputArgument = $command->getDefinition()->getArgument('receiver'); + $inputArgument = $command->getDefinition()->getArgument('receivers'); $this->assertFalse($inputArgument->isRequired()); - $this->assertSame('amqp', $inputArgument->getDefault()); - } - - public function testConfigurationWithoutDefaultReceiver() - { - $command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp', 'dummy']); - $inputArgument = $command->getDefinition()->getArgument('receiver'); - $this->assertTrue($inputArgument->isRequired()); - $this->assertNull($inputArgument->getDefault()); + $this->assertSame(['amqp'], $inputArgument->getDefault()); } } diff --git a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php index 4e32ba9b49aab..51e7cbc88c1f3 100644 --- a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php +++ b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php @@ -612,11 +612,9 @@ public function __invoke(DummyMessage $message): void class DummyReceiver implements ReceiverInterface { - public function receive(callable $handler): void + public function get(): iterable { - for ($i = 0; $i < 3; ++$i) { - $handler(new Envelope(new DummyMessage("Dummy $i"))); - } + yield new Envelope(new DummyMessage('Dummy')); } public function stop(): void diff --git a/src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php b/src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php deleted file mode 100644 index b1d26934d252c..0000000000000 --- a/src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php +++ /dev/null @@ -1,48 +0,0 @@ -callable = $callable; - } - - public function receive(callable $handler): void - { - $callable = $this->callable; - $callable($handler); - } - - public function stop(): void - { - } - - public function ack(Envelope $envelope): void - { - ++$this->acknowledgeCount; - } - - public function reject(Envelope $envelope): void - { - ++$this->rejectCount; - } - - public function getAcknowledgeCount(): int - { - return $this->acknowledgeCount; - } - - public function getRejectCount(): int - { - return $this->rejectCount; - } -} diff --git a/src/Symfony/Component/Messenger/Tests/Fixtures/DummyWorker.php b/src/Symfony/Component/Messenger/Tests/Fixtures/DummyWorker.php new file mode 100644 index 0000000000000..2c66bdedbeba6 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Fixtures/DummyWorker.php @@ -0,0 +1,46 @@ +envelopesToReceive = $envelopesToReceive; + } + + public function run(array $options = [], callable $onHandledCallback = null): void + { + foreach ($this->envelopesToReceive as $envelope) { + if (true === $this->isStopped) { + break; + } + + if ($onHandledCallback) { + $onHandledCallback($envelope); + ++$this->envelopesHandled; + } + } + } + + public function stop(): void + { + $this->isStopped = true; + } + + public function isStopped(): bool + { + return $this->isStopped; + } + + public function countEnvelopesHandled() + { + return $this->envelopesHandled; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php index 2949ae837fe36..b2ebfe4ab5043 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php @@ -57,16 +57,20 @@ public function testItSendsAndReceivesMessages() $sender->send($first = new Envelope(new DummyMessage('First'))); $sender->send($second = new Envelope(new DummyMessage('Second'))); - $receivedMessages = 0; - $receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) { - $expectedEnvelope = 0 === $receivedMessages ? $first : $second; - $this->assertEquals($expectedEnvelope->getMessage(), $envelope->getMessage()); - $this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class)); - - if (2 === ++$receivedMessages) { - $receiver->stop(); - } - }); + $envelopes = iterator_to_array($receiver->get()); + $this->assertCount(1, $envelopes); + /** @var Envelope $envelope */ + $envelope = $envelopes[0]; + $this->assertEquals($first->getMessage(), $envelope->getMessage()); + $this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class)); + + $envelopes = iterator_to_array($receiver->get()); + $this->assertCount(1, $envelopes); + /** @var Envelope $envelope */ + $envelope = $envelopes[0]; + $this->assertEquals($second->getMessage(), $envelope->getMessage()); + + $this->assertEmpty(iterator_to_array($receiver->get())); } public function testRetryAndDelay() @@ -82,50 +86,38 @@ public function testRetryAndDelay() $sender->send($first = new Envelope(new DummyMessage('First'))); - $receivedMessages = 0; - $startTime = time(); - $receiver->receive(function (?Envelope $envelope) use ($receiver, $sender, &$receivedMessages, $startTime) { - if (null === $envelope) { - // if we have been processing for 4 seconds + have received 2 messages - // then it's safe to say no other messages will be received - if (time() > $startTime + 4 && 2 === $receivedMessages) { - $receiver->stop(); - } - - return; - } - - ++$receivedMessages; - - // retry the first time - if (1 === $receivedMessages) { - // imitate what Worker does - $envelope = $envelope - ->with(new DelayStamp(2000)) - ->with(new RedeliveryStamp(1, 'not_important')); - $sender->send($envelope); - $receiver->ack($envelope); + $envelopes = iterator_to_array($receiver->get()); + /** @var Envelope $envelope */ + $envelope = $envelopes[0]; + $newEnvelope = $envelope + ->with(new DelayStamp(2000)) + ->with(new RedeliveryStamp(1, 'not_important')); + $sender->send($newEnvelope); + $receiver->ack($envelope); - return; - } + $envelopes = []; + $startTime = time(); + // wait for next message, but only for max 3 seconds + while (0 === \count($envelopes) && $startTime + 3 > time()) { + $envelopes = iterator_to_array($receiver->get()); + } - if (2 === $receivedMessages) { - // should have a 2 second delay - $this->assertGreaterThanOrEqual($startTime + 2, time()); - // but only a 2 second delay - $this->assertLessThan($startTime + 4, time()); + $this->assertCount(1, $envelopes); + /** @var Envelope $envelope */ + $envelope = $envelopes[0]; - /** @var RedeliveryStamp|null $retryStamp */ - // verify the stamp still exists from the last send - $retryStamp = $envelope->last(RedeliveryStamp::class); - $this->assertNotNull($retryStamp); - $this->assertSame(1, $retryStamp->getRetryCount()); + // should have a 2 second delay + $this->assertGreaterThanOrEqual($startTime + 2, time()); + // but only a 2 second delay + $this->assertLessThan($startTime + 4, time()); - $receiver->ack($envelope); + /** @var RedeliveryStamp|null $retryStamp */ + // verify the stamp still exists from the last send + $retryStamp = $envelope->last(RedeliveryStamp::class); + $this->assertNotNull($retryStamp); + $this->assertSame(1, $retryStamp->getRetryCount()); - return; - } - }); + $receiver->ack($envelope); } public function testItReceivesSignals() @@ -175,29 +167,6 @@ public function testItReceivesSignals() , $process->getOutput()); } - /** - * @runInSeparateProcess - */ - public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler() - { - $serializer = $this->createSerializer(); - - $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), ['read_timeout' => '1']); - $connection->setup(); - $connection->queue()->purge(); - - $receiver = new AmqpReceiver($connection, $serializer); - - $receivedMessages = 0; - $receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) { - $this->assertNull($envelope); - - if (2 === ++$receivedMessages) { - $receiver->stop(); - } - }); - } - private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10) { $timedOutTime = time() + $timeoutInSeconds; diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php index d0c8abfa3564e..d27ddb9cd26a9 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php @@ -14,9 +14,11 @@ use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp; use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver; use Symfony\Component\Messenger\Transport\AmqpExt\Connection; use Symfony\Component\Messenger\Transport\Serialization\Serializer; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Serializer as SerializerComponent; use Symfony\Component\Serializer\Encoder\JsonEncoder; use Symfony\Component\Serializer\Normalizer\ObjectNormalizer; @@ -26,7 +28,7 @@ */ class AmqpReceiverTest extends TestCase { - public function testItSendTheDecodedMessageToTheHandler() + public function testItReturnsTheDecodedMessageToTheHandler() { $serializer = new Serializer( new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) @@ -37,10 +39,9 @@ public function testItSendTheDecodedMessageToTheHandler() $connection->method('get')->willReturn($amqpEnvelope); $receiver = new AmqpReceiver($connection, $serializer); - $receiver->receive(function (?Envelope $envelope) use ($receiver) { - $this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage()); - $receiver->stop(); - }); + $actualEnvelopes = iterator_to_array($receiver->get()); + $this->assertCount(1, $actualEnvelopes); + $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage()); } /** @@ -48,20 +49,14 @@ public function testItSendTheDecodedMessageToTheHandler() */ public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage() { - $serializer = new Serializer( - new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) - ); - + $serializer = $this->createMock(SerializerInterface::class); $amqpEnvelope = $this->createAMQPEnvelope(); $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); $connection->method('get')->willReturn($amqpEnvelope); $connection->method('ack')->with($amqpEnvelope)->willThrowException(new \AMQPException()); $receiver = new AmqpReceiver($connection, $serializer); - $receiver->receive(function (?Envelope $envelope) use ($receiver) { - $receiver->ack($envelope); - $receiver->stop(); - }); + $receiver->ack(new Envelope(new \stdClass(), new AmqpReceivedStamp($amqpEnvelope))); } /** @@ -69,20 +64,14 @@ public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage() */ public function testItThrowsATransportExceptionIfItCannotRejectMessage() { - $serializer = new Serializer( - new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) - ); - + $serializer = $this->createMock(SerializerInterface::class); $amqpEnvelope = $this->createAMQPEnvelope(); $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); $connection->method('get')->willReturn($amqpEnvelope); $connection->method('nack')->with($amqpEnvelope, AMQP_NOPARAM)->willThrowException(new \AMQPException()); $receiver = new AmqpReceiver($connection, $serializer); - $receiver->receive(function (?Envelope $envelope) use ($receiver) { - $receiver->reject($envelope); - $receiver->stop(); - }); + $receiver->reject(new Envelope(new \stdClass(), new AmqpReceivedStamp($amqpEnvelope))); } private function createAMQPEnvelope() diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportTest.php index c343c29226369..b3e8b5729dd70 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportTest.php @@ -47,11 +47,8 @@ public function testReceivesMessages() $serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage)); $connection->method('get')->willReturn($amqpEnvelope); - $transport->receive(function (Envelope $envelope) use ($transport, $decodedMessage) { - $this->assertSame($decodedMessage, $envelope->getMessage()); - - $transport->stop(); - }); + $envelopes = iterator_to_array($transport->get()); + $this->assertSame($decodedMessage, $envelopes[0]->getMessage()); } private function getTransport(SerializerInterface $serializer = null, Connection $connection = null) diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Fixtures/long_receiver.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Fixtures/long_receiver.php index ba7236103579b..f51412093451c 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Fixtures/long_receiver.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Fixtures/long_receiver.php @@ -32,7 +32,7 @@ $receiver = new AmqpReceiver($connection, $serializer); $retryStrategy = new MultiplierRetryStrategy(3, 0); -$worker = new Worker($receiver, new class() implements MessageBusInterface { +$worker = new Worker(['the_receiver' => $receiver], new class() implements MessageBusInterface { public function dispatch($envelope): Envelope { echo 'Get envelope with message: '.\get_class($envelope->getMessage())."\n"; @@ -43,7 +43,7 @@ public function dispatch($envelope): Envelope return $envelope; } -}, 'the_receiver', $retryStrategy); +}); echo "Receiving messages...\n"; $worker->run(); diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiverTest.php deleted file mode 100644 index 27314e75502e7..0000000000000 --- a/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiverTest.php +++ /dev/null @@ -1,84 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Tests\Transport\Receiver; - -use PHPUnit\Framework\TestCase; -use Psr\Log\LoggerInterface; -use Symfony\Component\Messenger\Envelope; -use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver; -use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; -use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver; - -class StopWhenMemoryUsageIsExceededReceiverTest extends TestCase -{ - /** - * @dataProvider memoryProvider - */ - public function testReceiverStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop) - { - $callable = function ($handler) { - $handler(new Envelope(new DummyMessage('API'))); - }; - - $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class) - ->setConstructorArgs([$callable]) - ->enableProxyingToOriginalMethods() - ->getMock(); - - $decoratedReceiver->expects($this->once())->method('receive'); - if (true === $shouldStop) { - $decoratedReceiver->expects($this->once())->method('stop'); - } else { - $decoratedReceiver->expects($this->never())->method('stop'); - } - - $memoryResolver = function () use ($memoryUsage) { - return $memoryUsage; - }; - - $memoryLimitReceiver = new StopWhenMemoryUsageIsExceededReceiver($decoratedReceiver, $memoryLimit, null, $memoryResolver); - $memoryLimitReceiver->receive(function () {}); - } - - public function memoryProvider() - { - yield [2048, 1024, true]; - yield [1024, 1024, false]; - yield [1024, 2048, false]; - } - - public function testReceiverLogsMemoryExceededWhenLoggerIsGiven() - { - $callable = function ($handler) { - $handler(new Envelope(new DummyMessage('API'))); - }; - - $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class) - ->setConstructorArgs([$callable]) - ->enableProxyingToOriginalMethods() - ->getMock(); - - $decoratedReceiver->expects($this->once())->method('receive'); - $decoratedReceiver->expects($this->once())->method('stop'); - - $logger = $this->createMock(LoggerInterface::class); - $logger->expects($this->once())->method('info') - ->with('Receiver stopped due to memory limit of {limit} exceeded', ['limit' => 64 * 1024 * 1024]); - - $memoryResolver = function () { - return 70 * 1024 * 1024; - }; - - $memoryLimitReceiver = new StopWhenMemoryUsageIsExceededReceiver($decoratedReceiver, 64 * 1024 * 1024, $logger, $memoryResolver); - $memoryLimitReceiver->receive(function () {}); - } -} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenMessageCountIsExceededReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenMessageCountIsExceededReceiverTest.php deleted file mode 100644 index 1a303728a94e4..0000000000000 --- a/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenMessageCountIsExceededReceiverTest.php +++ /dev/null @@ -1,103 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Tests\Transport\Receiver; - -use PHPUnit\Framework\TestCase; -use Psr\Log\LoggerInterface; -use Symfony\Component\Messenger\Envelope; -use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver; -use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; -use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver; - -class StopWhenMessageCountIsExceededReceiverTest extends TestCase -{ - /** - * @dataProvider countProvider - */ - public function testReceiverStopsWhenMaximumCountExceeded($max, $shouldStop) - { - $callable = function ($handler) { - $handler(new Envelope(new DummyMessage('First message'))); - $handler(new Envelope(new DummyMessage('Second message'))); - $handler(new Envelope(new DummyMessage('Third message'))); - }; - - $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class) - ->setConstructorArgs([$callable]) - ->enableProxyingToOriginalMethods() - ->getMock(); - - $decoratedReceiver->expects($this->once())->method('receive'); - if (true === $shouldStop) { - $decoratedReceiver->expects($this->any())->method('stop'); - } else { - $decoratedReceiver->expects($this->never())->method('stop'); - } - - $maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, $max); - $maximumCountReceiver->receive(function () {}); - } - - public function countProvider() - { - yield [1, true]; - yield [2, true]; - yield [3, true]; - yield [4, false]; - } - - public function testReceiverDoesntIncreaseItsCounterWhenReceiveNullMessage() - { - $callable = function ($handler) { - $handler(null); - $handler(null); - $handler(null); - $handler(null); - }; - - $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class) - ->setConstructorArgs([$callable]) - ->enableProxyingToOriginalMethods() - ->getMock(); - - $decoratedReceiver->expects($this->once())->method('receive'); - $decoratedReceiver->expects($this->never())->method('stop'); - - $maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, 1); - $maximumCountReceiver->receive(function () {}); - } - - public function testReceiverLogsMaximumCountExceededWhenLoggerIsGiven() - { - $callable = function ($handler) { - $handler(new Envelope(new DummyMessage('First message'))); - }; - - $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class) - ->setConstructorArgs([$callable]) - ->enableProxyingToOriginalMethods() - ->getMock(); - - $decoratedReceiver->expects($this->once())->method('receive'); - $decoratedReceiver->expects($this->once())->method('stop'); - - $logger = $this->createMock(LoggerInterface::class); - $logger->expects($this->once())->method('info') - ->with( - $this->equalTo('Receiver stopped due to maximum count of {count} exceeded'), - $this->equalTo(['count' => 1]) - ); - - $maximumCountReceiver = new StopWhenMessageCountIsExceededReceiver($decoratedReceiver, 1, $logger); - $maximumCountReceiver->receive(function () {}); - } -} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenTimeLimitIsReachedReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenTimeLimitIsReachedReceiverTest.php deleted file mode 100644 index 472703fe6f7f1..0000000000000 --- a/src/Symfony/Component/Messenger/Tests/Transport/Receiver/StopWhenTimeLimitIsReachedReceiverTest.php +++ /dev/null @@ -1,49 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Tests\Transport\Receiver; - -use PHPUnit\Framework\TestCase; -use Psr\Log\LoggerInterface; -use Symfony\Component\Messenger\Envelope; -use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver; -use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; -use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver; - -class StopWhenTimeLimitIsReachedReceiverTest extends TestCase -{ - /** - * @group time-sensitive - */ - public function testReceiverStopsWhenTimeLimitIsReached() - { - $callable = function ($handler) { - $handler(new Envelope(new DummyMessage('API'))); - }; - - $decoratedReceiver = $this->getMockBuilder(CallbackReceiver::class) - ->setConstructorArgs([$callable]) - ->enableProxyingToOriginalMethods() - ->getMock(); - - $decoratedReceiver->expects($this->once())->method('receive'); - $decoratedReceiver->expects($this->once())->method('stop'); - - $logger = $this->createMock(LoggerInterface::class); - $logger->expects($this->once())->method('info') - ->with('Receiver stopped due to time limit of {timeLimit}s reached', ['timeLimit' => 1]); - - $timeoutReceiver = new StopWhenTimeLimitIsReachedReceiver($decoratedReceiver, 1, $logger); - $timeoutReceiver->receive(function () { - sleep(2); - }); - } -} diff --git a/src/Symfony/Component/Messenger/Tests/Worker/StopWhenMemoryUsageIsExceededWorkerTest.php b/src/Symfony/Component/Messenger/Tests/Worker/StopWhenMemoryUsageIsExceededWorkerTest.php new file mode 100644 index 0000000000000..d8a80e37a1468 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Worker/StopWhenMemoryUsageIsExceededWorkerTest.php @@ -0,0 +1,71 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Worker; + +use PHPUnit\Framework\TestCase; +use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker; +use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker; + +class StopWhenMemoryUsageIsExceededWorkerTest extends TestCase +{ + /** + * @dataProvider memoryProvider + */ + public function testWorkerStopsWhenMemoryLimitExceeded(int $memoryUsage, int $memoryLimit, bool $shouldStop) + { + $handlerCalledTimes = 0; + $handledCallback = function () use (&$handlerCalledTimes) { + ++$handlerCalledTimes; + }; + $decoratedWorker = new DummyWorker([ + new Envelope(new \stdClass()), + ]); + + $memoryResolver = function () use ($memoryUsage) { + return $memoryUsage; + }; + + $memoryLimitWorker = new StopWhenMemoryUsageIsExceededWorker($decoratedWorker, $memoryLimit, null, $memoryResolver); + $memoryLimitWorker->run([], $handledCallback); + + // handler should be called exactly 1 time + $this->assertSame($handlerCalledTimes, 1); + $this->assertSame($shouldStop, $decoratedWorker->isStopped()); + } + + public function memoryProvider() + { + yield [2048, 1024, true]; + yield [1024, 1024, false]; + yield [1024, 2048, false]; + } + + public function testWorkerLogsMemoryExceededWhenLoggerIsGiven() + { + $decoratedWorker = new DummyWorker([ + new Envelope(new \stdClass()), + ]); + + $logger = $this->createMock(LoggerInterface::class); + $logger->expects($this->once())->method('info') + ->with('Worker stopped due to memory limit of {limit} exceeded', ['limit' => 64 * 1024 * 1024]); + + $memoryResolver = function () { + return 70 * 1024 * 1024; + }; + + $memoryLimitWorker = new StopWhenMemoryUsageIsExceededWorker($decoratedWorker, 64 * 1024 * 1024, $logger, $memoryResolver); + $memoryLimitWorker->run(); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Worker/StopWhenMessageCountIsExceededWorkerTest.php b/src/Symfony/Component/Messenger/Tests/Worker/StopWhenMessageCountIsExceededWorkerTest.php new file mode 100644 index 0000000000000..f22d4524bfd37 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Worker/StopWhenMessageCountIsExceededWorkerTest.php @@ -0,0 +1,71 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Worker; + +use PHPUnit\Framework\TestCase; +use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker; +use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker; + +class StopWhenMessageCountIsExceededWorkerTest extends TestCase +{ + /** + * @dataProvider countProvider + */ + public function testWorkerStopsWhenMaximumCountExceeded($max, $shouldStop) + { + $handlerCalledTimes = 0; + $handledCallback = function () use (&$handlerCalledTimes) { + ++$handlerCalledTimes; + }; + // receive 3 real messages + $decoratedWorker = new DummyWorker([ + new Envelope(new DummyMessage('First message')), + null, + new Envelope(new DummyMessage('Second message')), + null, + new Envelope(new DummyMessage('Third message')), + ]); + + $maximumCountWorker = new StopWhenMessageCountIsExceededWorker($decoratedWorker, $max); + $maximumCountWorker->run([], $handledCallback); + + $this->assertSame($shouldStop, $decoratedWorker->isStopped()); + } + + public function countProvider() + { + yield [1, true]; + yield [2, true]; + yield [3, true]; + yield [4, false]; + } + + public function testWorkerLogsMaximumCountExceededWhenLoggerIsGiven() + { + $decoratedWorker = new DummyWorker([ + new Envelope(new \stdClass()), + ]); + + $logger = $this->createMock(LoggerInterface::class); + $logger->expects($this->once())->method('info') + ->with( + $this->equalTo('Worker stopped due to maximum count of {count} exceeded'), + $this->equalTo(['count' => 1]) + ); + + $maximumCountWorker = new StopWhenMessageCountIsExceededWorker($decoratedWorker, 1, $logger); + $maximumCountWorker->run(); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Worker/StopWhenTimeLimitIsReachedWorkerTest.php b/src/Symfony/Component/Messenger/Tests/Worker/StopWhenTimeLimitIsReachedWorkerTest.php new file mode 100644 index 0000000000000..4b06c7392b4a6 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Worker/StopWhenTimeLimitIsReachedWorkerTest.php @@ -0,0 +1,44 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Worker; + +use PHPUnit\Framework\TestCase; +use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker; +use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker; + +class StopWhenTimeLimitIsReachedWorkerTest extends TestCase +{ + /** + * @group time-sensitive + */ + public function testWorkerStopsWhenTimeLimitIsReached() + { + $decoratedWorker = new DummyWorker([ + new Envelope(new \stdClass()), + new Envelope(new \stdClass()), + ]); + + $logger = $this->createMock(LoggerInterface::class); + $logger->expects($this->once())->method('info') + ->with('Worker stopped due to time limit of {timeLimit}s reached', ['timeLimit' => 1]); + + $timeoutWorker = new StopWhenTimeLimitIsReachedWorker($decoratedWorker, 1, $logger); + $timeoutWorker->run([], function () { + sleep(2); + }); + + $this->assertTrue($decoratedWorker->isStopped()); + $this->assertSame(1, $decoratedWorker->countEnvelopesHandled()); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index 4ecb7719cbaef..b5c6a6557b2cd 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -22,11 +22,14 @@ use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\SentStamp; -use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Worker; use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; +/** + * @group time-sensitive + */ class WorkerTest extends TestCase { public function testWorkerDispatchTheReceivedMessage() @@ -34,18 +37,22 @@ public function testWorkerDispatchTheReceivedMessage() $apiMessage = new DummyMessage('API'); $ipaMessage = new DummyMessage('IPA'); - $receiver = new CallbackReceiver(function ($handler) use ($apiMessage, $ipaMessage) { - $handler(new Envelope($apiMessage)); - $handler(new Envelope($ipaMessage)); - }); + $receiver = new DummyReceiver([ + [new Envelope($apiMessage), new Envelope($ipaMessage)], + ]); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus->expects($this->at(0))->method('dispatch')->with($envelope = new Envelope($apiMessage, new ReceivedStamp()))->willReturn($envelope); $bus->expects($this->at(1))->method('dispatch')->with($envelope = new Envelope($ipaMessage, new ReceivedStamp()))->willReturn($envelope); - $worker = new Worker($receiver, $bus, 'receiver_id'); - $worker->run(); + $worker = new Worker([$receiver], $bus); + $worker->run([], function (?Envelope $envelope) use ($worker) { + // stop after the messages finish + if (null === $envelope) { + $worker->stop(); + } + }); $this->assertSame(2, $receiver->getAcknowledgeCount()); } @@ -53,24 +60,26 @@ public function testWorkerDispatchTheReceivedMessage() public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage() { $envelope = new Envelope(new DummyMessage('API')); - $receiver = new CallbackReceiver(function ($handler) use ($envelope) { - $handler($envelope); - }); + $receiver = new DummyReceiver([[$envelope]]); $envelope = $envelope->with(new ReceivedStamp()); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus->expects($this->at(0))->method('dispatch')->with($envelope)->willReturn($envelope); - $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock(); - $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy); - $worker->run(); + $worker = new Worker([$receiver], $bus, []); + $worker->run([], function (?Envelope $envelope) use ($worker) { + // stop after the messages finish + if (null === $envelope) { + $worker->stop(); + } + }); } public function testDispatchCausesRetry() { - $receiver = new CallbackReceiver(function ($handler) { - $handler(new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias'))); - }); + $receiver = new DummyReceiver([ + [new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias'))], + ]); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus->expects($this->at(0))->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not')); @@ -93,8 +102,13 @@ public function testDispatchCausesRetry() $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock(); $retryStrategy->expects($this->once())->method('isRetryable')->willReturn(true); - $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy); - $worker->run(); + $worker = new Worker(['receiver1' => $receiver], $bus, ['receiver1' => $retryStrategy]); + $worker->run([], function (?Envelope $envelope) use ($worker) { + // stop after the messages finish + if (null === $envelope) { + $worker->stop(); + } + }); // old message acknowledged $this->assertSame(1, $receiver->getAcknowledgeCount()); @@ -102,9 +116,9 @@ public function testDispatchCausesRetry() public function testDispatchCausesRejectWhenNoRetry() { - $receiver = new CallbackReceiver(function ($handler) { - $handler(new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias'))); - }); + $receiver = new DummyReceiver([ + [new Envelope(new DummyMessage('Hello'), new SentStamp('Some\Sender', 'sender_alias'))], + ]); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not')); @@ -112,17 +126,22 @@ public function testDispatchCausesRejectWhenNoRetry() $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock(); $retryStrategy->expects($this->once())->method('isRetryable')->willReturn(false); - $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy); - $worker->run(); + $worker = new Worker(['receiver1' => $receiver], $bus, ['receiver1' => $retryStrategy]); + $worker->run([], function (?Envelope $envelope) use ($worker) { + // stop after the messages finish + if (null === $envelope) { + $worker->stop(); + } + }); $this->assertSame(1, $receiver->getRejectCount()); $this->assertSame(0, $receiver->getAcknowledgeCount()); } public function testDispatchCausesRejectOnUnrecoverableMessage() { - $receiver = new CallbackReceiver(function ($handler) { - $handler(new Envelope(new DummyMessage('Hello'))); - }); + $receiver = new DummyReceiver([ + [new Envelope(new DummyMessage('Hello'))], + ]); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus->method('dispatch')->willThrowException(new UnrecoverableMessageHandlingException('Will never work')); @@ -130,36 +149,42 @@ public function testDispatchCausesRejectOnUnrecoverableMessage() $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock(); $retryStrategy->expects($this->never())->method('isRetryable'); - $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy); - $worker->run(); + $worker = new Worker(['receiver1' => $receiver], $bus, ['receiver1' => $retryStrategy]); + $worker->run([], function (?Envelope $envelope) use ($worker) { + // stop after the messages finish + if (null === $envelope) { + $worker->stop(); + } + }); $this->assertSame(1, $receiver->getRejectCount()); } public function testWorkerDoesNotSendNullMessagesToTheBus() { - $receiver = new CallbackReceiver(function ($handler) { - $handler(null); - }); + $receiver = new DummyReceiver([ + null, + ]); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus->expects($this->never())->method('dispatch'); - $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock(); - $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy); - $worker->run(); + $worker = new Worker([$receiver], $bus); + $worker->run([], function (?Envelope $envelope) use ($worker) { + // stop after the messages finish + if (null === $envelope) { + $worker->stop(); + } + }); } public function testWorkerDispatchesEventsOnSuccess() { $envelope = new Envelope(new DummyMessage('Hello')); - $receiver = new CallbackReceiver(function ($handler) use ($envelope) { - $handler($envelope); - }); + $receiver = new DummyReceiver([[$envelope]]); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus->method('dispatch')->willReturn($envelope); - $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock(); $eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)->getMock(); $eventDispatcher->expects($this->exactly(2)) @@ -169,22 +194,24 @@ public function testWorkerDispatchesEventsOnSuccess() [$this->isInstanceOf(WorkerMessageHandledEvent::class)] ); - $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy, $eventDispatcher); - $worker->run(); + $worker = new Worker([$receiver], $bus, [], $eventDispatcher); + $worker->run([], function (?Envelope $envelope) use ($worker) { + // stop after the messages finish + if (null === $envelope) { + $worker->stop(); + } + }); } public function testWorkerDispatchesEventsOnError() { $envelope = new Envelope(new DummyMessage('Hello')); - $receiver = new CallbackReceiver(function ($handler) use ($envelope) { - $handler($envelope); - }); + $receiver = new DummyReceiver([[$envelope]]); $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $exception = new \InvalidArgumentException('Oh no!'); $bus->method('dispatch')->willThrowException($exception); - $retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock(); $eventDispatcher = $this->getMockBuilder(EventDispatcherInterface::class)->getMock(); $eventDispatcher->expects($this->exactly(2)) @@ -194,7 +221,143 @@ public function testWorkerDispatchesEventsOnError() [$this->isInstanceOf(WorkerMessageFailedEvent::class)] ); - $worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy, $eventDispatcher); - $worker->run(); + $worker = new Worker([$receiver], $bus, [], $eventDispatcher); + $worker->run([], function (?Envelope $envelope) use ($worker) { + // stop after the messages finish + if (null === $envelope) { + $worker->stop(); + } + }); + } + + public function testTimeoutIsConfigurable() + { + $apiMessage = new DummyMessage('API'); + $receiver = new DummyReceiver([ + [new Envelope($apiMessage), new Envelope($apiMessage)], + [], // will cause a wait + [], // will cause a wait + [new Envelope($apiMessage)], + [new Envelope($apiMessage)], + [], // will cause a wait + [new Envelope($apiMessage)], + ]); + + $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); + + $worker = new Worker([$receiver], $bus); + $receivedCount = 0; + $startTime = microtime(true); + // sleep .1 after each idle + $worker->run(['sleep' => 100000], function (?Envelope $envelope) use ($worker, &$receivedCount, $startTime) { + if (null !== $envelope) { + ++$receivedCount; + } + + if (5 === $receivedCount) { + $worker->stop(); + $duration = microtime(true) - $startTime; + + // wait time should be .3 seconds + // use .29 & .31 for timing "wiggle room" + $this->assertGreaterThanOrEqual(.29, $duration); + $this->assertLessThan(.31, $duration); + } + }); + } + + public function testWorkerWithMultipleReceivers() + { + // envelopes, in their expected delivery order + $envelope1 = new Envelope(new DummyMessage('message1')); + $envelope2 = new Envelope(new DummyMessage('message2')); + $envelope3 = new Envelope(new DummyMessage('message3')); + $envelope4 = new Envelope(new DummyMessage('message4')); + $envelope5 = new Envelope(new DummyMessage('message5')); + $envelope6 = new Envelope(new DummyMessage('message6')); + + /* + * Round 1) receiver 1 & 2 have nothing, receiver 3 processes envelope1 and envelope2 + * Round 2) receiver 1 has nothing, receiver 2 processes envelope3, receiver 3 is not called + * Round 3) receiver 1 processes envelope 4, receivers 2 & 3 are not called + * Round 4) receiver 1 processes envelope 5, receivers 2 & 3 are not called + * Round 5) receiver 1 has nothing, receiver 2 has nothing, receiver 3 has envelope 6 + */ + $receiver1 = new DummyReceiver([ + [], + [], + [$envelope4], + [$envelope5], + [], + ]); + $receiver2 = new DummyReceiver([ + [], + [$envelope3], + [], + ]); + $receiver3 = new DummyReceiver([ + [$envelope1, $envelope2], + [], + [$envelope6], + ]); + + $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); + + $receivedCount = 0; + $worker = new Worker([$receiver1, $receiver2, $receiver3], $bus); + $processedEnvelopes = []; + $worker->run([], function (?Envelope $envelope) use ($worker, &$receivedCount, &$processedEnvelopes) { + if (null !== $envelope) { + $processedEnvelopes[] = $envelope; + ++$receivedCount; + } + + // stop after the messages finish + if (6 === $receivedCount) { + $worker->stop(); + } + }); + + // make sure they were processed in the correct order + $this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes); + } +} + +class DummyReceiver implements ReceiverInterface +{ + private $deliveriesOfEnvelopes; + private $acknowledgeCount = 0; + private $rejectCount = 0; + + public function __construct(array $deliveriesOfEnvelopes) + { + $this->deliveriesOfEnvelopes = $deliveriesOfEnvelopes; + } + + public function get(): iterable + { + $val = array_shift($this->deliveriesOfEnvelopes); + + return null === $val ? [] : $val; + } + + public function ack(Envelope $envelope): void + { + ++$this->acknowledgeCount; + } + + public function reject(Envelope $envelope): void + { + ++$this->rejectCount; + } + + public function getAcknowledgeCount(): int + { + return $this->acknowledgeCount; + } + + public function getRejectCount(): int + { + return $this->rejectCount; } } diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php index 93afbaff5d8b8..ce1918e1da0c1 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php @@ -30,7 +30,6 @@ class AmqpReceiver implements ReceiverInterface { private $serializer; private $connection; - private $shouldStop; public function __construct(Connection $connection, SerializerInterface $serializer = null) { @@ -41,38 +40,31 @@ public function __construct(Connection $connection, SerializerInterface $seriali /** * {@inheritdoc} */ - public function receive(callable $handler): void + public function get(): iterable { - while (!$this->shouldStop) { - try { - $amqpEnvelope = $this->connection->get(); - } catch (\AMQPException $exception) { - throw new TransportException($exception->getMessage(), 0, $exception); - } - - if (null === $amqpEnvelope) { - $handler(null); - - usleep($this->connection->getConnectionConfiguration()['loop_sleep'] ?? 200000); - - continue; - } - - try { - $envelope = $this->serializer->decode([ - 'body' => $amqpEnvelope->getBody(), - 'headers' => $amqpEnvelope->getHeaders(), - ]); - } catch (MessageDecodingFailedException $exception) { - // invalid message of some type - $this->rejectAmqpEnvelope($amqpEnvelope); - - throw $exception; - } - - $envelope = $envelope->with(new AmqpReceivedStamp($amqpEnvelope)); - $handler($envelope); + try { + $amqpEnvelope = $this->connection->get(); + } catch (\AMQPException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } + + if (null === $amqpEnvelope) { + return []; + } + + try { + $envelope = $this->serializer->decode([ + 'body' => $amqpEnvelope->getBody(), + 'headers' => $amqpEnvelope->getHeaders(), + ]); + } catch (MessageDecodingFailedException $exception) { + // invalid message of some type + $this->rejectAmqpEnvelope($amqpEnvelope); + + throw $exception; } + + yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope)); } public function ack(Envelope $envelope): void @@ -89,11 +81,6 @@ public function reject(Envelope $envelope): void $this->rejectAmqpEnvelope($this->findAmqpEnvelope($envelope)); } - public function stop(): void - { - $this->shouldStop = true; - } - private function rejectAmqpEnvelope(\AMQPEnvelope $amqpEnvelope): void { try { diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php index 2146bd900d29c..586e16c4d86dd 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php @@ -38,17 +38,9 @@ public function __construct(Connection $connection, SerializerInterface $seriali /** * {@inheritdoc} */ - public function receive(callable $handler): void + public function get(): iterable { - ($this->receiver ?? $this->getReceiver())->receive($handler); - } - - /** - * {@inheritdoc} - */ - public function stop(): void - { - ($this->receiver ?? $this->getReceiver())->stop(); + return ($this->receiver ?? $this->getReceiver())->get(); } /** diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php b/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php index 29da741a11f34..d974ff00b7902 100644 --- a/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php +++ b/src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php @@ -25,21 +25,25 @@ interface ReceiverInterface /** * Receive some messages to the given handler. * - * The handler will have, as argument, the received {@link \Symfony\Component\Messenger\Envelope} containing the message. - * Note that this envelope can be `null` if the timeout to receive something has expired. + * While this method could return an unlimited number of messages, + * the intention is that it returns only one, or a "small number" + * of messages each time. This gives the user more flexibility: + * they can finish processing the one (or "small number") of messages + * from this receiver and move on to check other receivers for messages. + * If a this method returns too many messages, it could cause a + * blocking effect where handling the messages received from one + * call to get() takes a long time, blocking other receivers from + * being called. * - * If the received message cannot be decoded, the message should not + * If a received message cannot be decoded, the message should not * be retried again (e.g. if there's a queue, it should be removed) * and a MessageDecodingFailedException should be thrown. * * @throws TransportException If there is an issue communicating with the transport + * + * @return Envelope[] */ - public function receive(callable $handler): void; - - /** - * Stop receiving some messages. - */ - public function stop(): void; + public function get(): iterable; /** * Acknowledge that the passed message was handled. diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiver.php b/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiver.php deleted file mode 100644 index 09af4673b87b4..0000000000000 --- a/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMemoryUsageIsExceededReceiver.php +++ /dev/null @@ -1,68 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Transport\Receiver; - -use Psr\Log\LoggerInterface; -use Symfony\Component\Messenger\Envelope; - -/** - * @author Simon Delicata - * - * @experimental in 4.2 - */ -class StopWhenMemoryUsageIsExceededReceiver implements ReceiverInterface -{ - private $decoratedReceiver; - private $memoryLimit; - private $logger; - private $memoryResolver; - - public function __construct(ReceiverInterface $decoratedReceiver, int $memoryLimit, LoggerInterface $logger = null, callable $memoryResolver = null) - { - $this->decoratedReceiver = $decoratedReceiver; - $this->memoryLimit = $memoryLimit; - $this->logger = $logger; - $this->memoryResolver = $memoryResolver ?: function () { - return \memory_get_usage(); - }; - } - - public function receive(callable $handler): void - { - $this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler) { - $handler($envelope); - - $memoryResolver = $this->memoryResolver; - if ($memoryResolver() > $this->memoryLimit) { - $this->stop(); - if (null !== $this->logger) { - $this->logger->info('Receiver stopped due to memory limit of {limit} exceeded', ['limit' => $this->memoryLimit]); - } - } - }); - } - - public function stop(): void - { - $this->decoratedReceiver->stop(); - } - - public function ack(Envelope $envelope): void - { - $this->decoratedReceiver->ack($envelope); - } - - public function reject(Envelope $envelope): void - { - $this->decoratedReceiver->reject($envelope); - } -} diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMessageCountIsExceededReceiver.php b/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMessageCountIsExceededReceiver.php deleted file mode 100644 index 8be38d157e8f1..0000000000000 --- a/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenMessageCountIsExceededReceiver.php +++ /dev/null @@ -1,65 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Transport\Receiver; - -use Psr\Log\LoggerInterface; -use Symfony\Component\Messenger\Envelope; - -/** - * @author Samuel Roze - * - * @experimental in 4.2 - */ -class StopWhenMessageCountIsExceededReceiver implements ReceiverInterface -{ - private $decoratedReceiver; - private $maximumNumberOfMessages; - private $logger; - - public function __construct(ReceiverInterface $decoratedReceiver, int $maximumNumberOfMessages, LoggerInterface $logger = null) - { - $this->decoratedReceiver = $decoratedReceiver; - $this->maximumNumberOfMessages = $maximumNumberOfMessages; - $this->logger = $logger; - } - - public function receive(callable $handler): void - { - $receivedMessages = 0; - - $this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler, &$receivedMessages) { - $handler($envelope); - - if (null !== $envelope && ++$receivedMessages >= $this->maximumNumberOfMessages) { - $this->stop(); - if (null !== $this->logger) { - $this->logger->info('Receiver stopped due to maximum count of {count} exceeded', ['count' => $this->maximumNumberOfMessages]); - } - } - }); - } - - public function stop(): void - { - $this->decoratedReceiver->stop(); - } - - public function ack(Envelope $envelope): void - { - $this->decoratedReceiver->ack($envelope); - } - - public function reject(Envelope $envelope): void - { - $this->decoratedReceiver->reject($envelope); - } -} diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenTimeLimitIsReachedReceiver.php b/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenTimeLimitIsReachedReceiver.php deleted file mode 100644 index ade088b7dabb1..0000000000000 --- a/src/Symfony/Component/Messenger/Transport/Receiver/StopWhenTimeLimitIsReachedReceiver.php +++ /dev/null @@ -1,66 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Transport\Receiver; - -use Psr\Log\LoggerInterface; -use Symfony\Component\Messenger\Envelope; - -/** - * @author Simon Delicata - * - * @experimental in 4.2 - */ -class StopWhenTimeLimitIsReachedReceiver implements ReceiverInterface -{ - private $decoratedReceiver; - private $timeLimitInSeconds; - private $logger; - - public function __construct(ReceiverInterface $decoratedReceiver, int $timeLimitInSeconds, LoggerInterface $logger = null) - { - $this->decoratedReceiver = $decoratedReceiver; - $this->timeLimitInSeconds = $timeLimitInSeconds; - $this->logger = $logger; - } - - public function receive(callable $handler): void - { - $startTime = microtime(true); - $endTime = $startTime + $this->timeLimitInSeconds; - - $this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler, $endTime) { - $handler($envelope); - - if ($endTime < microtime(true)) { - $this->stop(); - if (null !== $this->logger) { - $this->logger->info('Receiver stopped due to time limit of {timeLimit}s reached', ['timeLimit' => $this->timeLimitInSeconds]); - } - } - }); - } - - public function stop(): void - { - $this->decoratedReceiver->stop(); - } - - public function ack(Envelope $envelope): void - { - $this->decoratedReceiver->ack($envelope); - } - - public function reject(Envelope $envelope): void - { - $this->decoratedReceiver->reject($envelope); - } -} diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index a273b39da95a7..1e7f539f6ce40 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -32,112 +32,144 @@ * * @final */ -class Worker +class Worker implements WorkerInterface { - private $receiver; + private $receivers; private $bus; - private $receiverName; - private $retryStrategy; + private $retryStrategies; private $eventDispatcher; private $logger; + private $shouldStop = false; - public function __construct(ReceiverInterface $receiver, MessageBusInterface $bus, string $receiverName = null, RetryStrategyInterface $retryStrategy = null, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null) + /** + * @param ReceiverInterface[] $receivers Where the key will be used as the string "identifier" + * @param RetryStrategyInterface[] $retryStrategies Retry strategies for each receiver (array keys must match) + */ + public function __construct(array $receivers, MessageBusInterface $bus, $retryStrategies = [], EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null) { - $this->receiver = $receiver; + $this->receivers = $receivers; $this->bus = $bus; - if (null === $receiverName) { - @trigger_error(sprintf('Instantiating the "%s" class without passing a third argument is deprecated since Symfony 4.3.', __CLASS__), E_USER_DEPRECATED); - - $receiverName = 'unknown'; - } - $this->receiverName = $receiverName; - $this->retryStrategy = $retryStrategy; + $this->retryStrategies = $retryStrategies; $this->eventDispatcher = $eventDispatcher; $this->logger = $logger; } /** * Receive the messages and dispatch them to the bus. + * + * Valid options are: + * * sleep (default: 1000000): Time in microseconds to sleep after no messages are found */ - public function run() + public function run(array $options = [], callable $onHandledCallback = null): void { + $options = array_merge([ + 'sleep' => 1000000, + ], $options); + if (\function_exists('pcntl_signal')) { pcntl_signal(SIGTERM, function () { - $this->receiver->stop(); + $this->stop(); }); } - $this->receiver->receive(function (?Envelope $envelope) { - if (null === $envelope) { - if (\function_exists('pcntl_signal_dispatch')) { - pcntl_signal_dispatch(); - } + $onHandled = function (?Envelope $envelope) use ($onHandledCallback) { + if (\function_exists('pcntl_signal_dispatch')) { + pcntl_signal_dispatch(); + } - return; + if (null !== $onHandledCallback) { + $onHandledCallback($envelope); } + }; - $this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $this->receiverName)); - - $message = $envelope->getMessage(); - $context = [ - 'message' => $message, - 'class' => \get_class($message), - ]; - - try { - $envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp())); - } catch (\Throwable $throwable) { - $shouldRetry = $this->shouldRetry($throwable, $envelope); - - $this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $this->receiverName, $throwable, $shouldRetry)); - - if ($shouldRetry) { - if (null === $this->retryStrategy) { - // not logically allowed, but check just in case - throw new LogicException('Retrying is not supported without a retry strategy.'); - } - - $retryCount = $this->getRetryCount($envelope) + 1; - if (null !== $this->logger) { - $this->logger->error('Retrying {class} - retry #{retryCount}.', $context + ['retryCount' => $retryCount, 'error' => $throwable]); - } - - // add the delay and retry stamp info + remove ReceivedStamp - $retryEnvelope = $envelope->with(new DelayStamp($this->retryStrategy->getWaitingTime($envelope))) - ->with(new RedeliveryStamp($retryCount, $this->getSenderAlias($envelope))) - ->withoutAll(ReceivedStamp::class); - - // re-send the message - $this->bus->dispatch($retryEnvelope); - // acknowledge the previous message has received - $this->receiver->ack($envelope); - } else { - if (null !== $this->logger) { - $this->logger->critical('Rejecting {class} (removing from transport).', $context + ['error' => $throwable]); - } - - $this->receiver->reject($envelope); - } + while (false === $this->shouldStop) { + $envelopeHandled = false; + foreach ($this->receivers as $receiverName => $receiver) { + $envelopes = $receiver->get(); + + foreach ($envelopes as $envelope) { + $envelopeHandled = true; - if (\function_exists('pcntl_signal_dispatch')) { - pcntl_signal_dispatch(); + $this->handleMessage($envelope, $receiver, $receiverName, $this->retryStrategies[$receiverName] ?? null); + $onHandled($envelope); } - return; + // after handling a single receiver, quit and start the loop again + // this should prevent multiple lower priority receivers from + // blocking too long before the higher priority are checked + if ($envelopeHandled) { + break; + } } - $this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $this->receiverName)); + if (false === $envelopeHandled) { + $onHandled(null); - if (null !== $this->logger) { - $this->logger->info('{class} was handled successfully (acknowledging to transport).', $context); + usleep($options['sleep']); } + } + } - $this->receiver->ack($envelope); + private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $receiverName, ?RetryStrategyInterface $retryStrategy) + { + $this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $receiverName)); - if (\function_exists('pcntl_signal_dispatch')) { - pcntl_signal_dispatch(); + $message = $envelope->getMessage(); + $context = [ + 'message' => $message, + 'class' => \get_class($message), + ]; + + try { + $envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp())); + } catch (\Throwable $throwable) { + $shouldRetry = $this->shouldRetry($throwable, $envelope, $retryStrategy); + + $this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $receiverName, $throwable, $shouldRetry)); + + if ($shouldRetry) { + if (null === $retryStrategy) { + // not logically allowed, but check just in case + throw new LogicException('Retrying is not supported without a retry strategy.'); + } + + $retryCount = $this->getRetryCount($envelope) + 1; + if (null !== $this->logger) { + $this->logger->error('Retrying {class} - retry #{retryCount}.', $context + ['retryCount' => $retryCount, 'error' => $throwable]); + } + + // add the delay and retry stamp info + remove ReceivedStamp + $retryEnvelope = $envelope->with(new DelayStamp($retryStrategy->getWaitingTime($envelope))) + ->with(new RedeliveryStamp($retryCount, $this->getSenderAlias($envelope))) + ->withoutAll(ReceivedStamp::class); + + // re-send the message + $this->bus->dispatch($retryEnvelope); + // acknowledge the previous message has received + $receiver->ack($envelope); + } else { + if (null !== $this->logger) { + $this->logger->critical('Rejecting {class} (removing from transport).', $context + ['error' => $throwable]); + } + + $receiver->reject($envelope); } - }); + + return; + } + + $this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $receiverName)); + + if (null !== $this->logger) { + $this->logger->info('{class} was handled successfully (acknowledging to transport).', $context); + } + + $receiver->ack($envelope); + } + + public function stop(): void + { + $this->shouldStop = true; } private function dispatchEvent($event) @@ -149,17 +181,17 @@ private function dispatchEvent($event) $this->eventDispatcher->dispatch($event); } - private function shouldRetry(\Throwable $e, Envelope $envelope): bool + private function shouldRetry(\Throwable $e, Envelope $envelope, ?RetryStrategyInterface $retryStrategy): bool { if ($e instanceof UnrecoverableMessageHandlingException) { return false; } - if (null === $this->retryStrategy) { + if (null === $retryStrategy) { return false; } - return $this->retryStrategy->isRetryable($envelope); + return $retryStrategy->isRetryable($envelope); } private function getRetryCount(Envelope $envelope): int diff --git a/src/Symfony/Component/Messenger/Worker/StopWhenMemoryUsageIsExceededWorker.php b/src/Symfony/Component/Messenger/Worker/StopWhenMemoryUsageIsExceededWorker.php new file mode 100644 index 0000000000000..77e848898234d --- /dev/null +++ b/src/Symfony/Component/Messenger/Worker/StopWhenMemoryUsageIsExceededWorker.php @@ -0,0 +1,61 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Worker; + +use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\WorkerInterface; + +/** + * @author Simon Delicata + * + * @experimental in 4.3 + */ +class StopWhenMemoryUsageIsExceededWorker implements WorkerInterface +{ + private $decoratedWorker; + private $memoryLimit; + private $logger; + private $memoryResolver; + + public function __construct(WorkerInterface $decoratedWorker, int $memoryLimit, LoggerInterface $logger = null, callable $memoryResolver = null) + { + $this->decoratedWorker = $decoratedWorker; + $this->memoryLimit = $memoryLimit; + $this->logger = $logger; + $this->memoryResolver = $memoryResolver ?: function () { + return \memory_get_usage(); + }; + } + + public function run(array $options = [], callable $onHandledCallback = null): void + { + $this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback) { + if (null !== $onHandledCallback) { + $onHandledCallback($envelope); + } + + $memoryResolver = $this->memoryResolver; + if ($memoryResolver() > $this->memoryLimit) { + $this->stop(); + if (null !== $this->logger) { + $this->logger->info('Worker stopped due to memory limit of {limit} exceeded', ['limit' => $this->memoryLimit]); + } + } + }); + } + + public function stop(): void + { + $this->decoratedWorker->stop(); + } +} diff --git a/src/Symfony/Component/Messenger/Worker/StopWhenMessageCountIsExceededWorker.php b/src/Symfony/Component/Messenger/Worker/StopWhenMessageCountIsExceededWorker.php new file mode 100644 index 0000000000000..c72e2cc954a78 --- /dev/null +++ b/src/Symfony/Component/Messenger/Worker/StopWhenMessageCountIsExceededWorker.php @@ -0,0 +1,58 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Worker; + +use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\WorkerInterface; + +/** + * @author Samuel Roze + * + * @experimental in 4.3 + */ +class StopWhenMessageCountIsExceededWorker implements WorkerInterface +{ + private $decoratedWorker; + private $maximumNumberOfMessages; + private $logger; + + public function __construct(WorkerInterface $decoratedWorker, int $maximumNumberOfMessages, LoggerInterface $logger = null) + { + $this->decoratedWorker = $decoratedWorker; + $this->maximumNumberOfMessages = $maximumNumberOfMessages; + $this->logger = $logger; + } + + public function run(array $options = [], callable $onHandledCallback = null): void + { + $receivedMessages = 0; + + $this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, &$receivedMessages) { + if (null !== $onHandledCallback) { + $onHandledCallback($envelope); + } + + if (null !== $envelope && ++$receivedMessages >= $this->maximumNumberOfMessages) { + $this->stop(); + if (null !== $this->logger) { + $this->logger->info('Worker stopped due to maximum count of {count} exceeded', ['count' => $this->maximumNumberOfMessages]); + } + } + }); + } + + public function stop(): void + { + $this->decoratedWorker->stop(); + } +} diff --git a/src/Symfony/Component/Messenger/Worker/StopWhenTimeLimitIsReachedWorker.php b/src/Symfony/Component/Messenger/Worker/StopWhenTimeLimitIsReachedWorker.php new file mode 100644 index 0000000000000..3a4dcf859d859 --- /dev/null +++ b/src/Symfony/Component/Messenger/Worker/StopWhenTimeLimitIsReachedWorker.php @@ -0,0 +1,59 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Worker; + +use Psr\Log\LoggerInterface; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\WorkerInterface; + +/** + * @author Simon Delicata + * + * @experimental in 4.3 + */ +class StopWhenTimeLimitIsReachedWorker implements WorkerInterface +{ + private $decoratedWorker; + private $timeLimitInSeconds; + private $logger; + + public function __construct(WorkerInterface $decoratedWorker, int $timeLimitInSeconds, LoggerInterface $logger = null) + { + $this->decoratedWorker = $decoratedWorker; + $this->timeLimitInSeconds = $timeLimitInSeconds; + $this->logger = $logger; + } + + public function run(array $options = [], callable $onHandledCallback = null): void + { + $startTime = microtime(true); + $endTime = $startTime + $this->timeLimitInSeconds; + + $this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, $endTime) { + if (null !== $onHandledCallback) { + $onHandledCallback($envelope); + } + + if ($endTime < microtime(true)) { + $this->stop(); + if (null !== $this->logger) { + $this->logger->info('Worker stopped due to time limit of {timeLimit}s reached', ['timeLimit' => $this->timeLimitInSeconds]); + } + } + }); + } + + public function stop(): void + { + $this->decoratedWorker->stop(); + } +} diff --git a/src/Symfony/Component/Messenger/WorkerInterface.php b/src/Symfony/Component/Messenger/WorkerInterface.php new file mode 100644 index 0000000000000..a4b3566871cb2 --- /dev/null +++ b/src/Symfony/Component/Messenger/WorkerInterface.php @@ -0,0 +1,37 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger; + +/** + * Interface for Workers that handle messages from transports. + * + * @experimental in 4.3 + * + * @author Ryan Weaver + */ +interface WorkerInterface +{ + /** + * Receive the messages and dispatch them to the bus. + * + * The $onHandledCallback will be passed the Envelope that was just + * handled or null if nothing was handled. + * + * @param mixed[] $options options used to control worker behavior + */ + public function run(array $options = [], callable $onHandledCallback = null): void; + + /** + * Stop receiving messages. + */ + public function stop(): void; +}