diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 20a8fff7b4b5c..03b4864520d4d 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -79,6 +79,7 @@ CHANGELOG * Added a `SetupTransportsCommand` command to setup the transports * Added a Doctrine transport. For example, use the `doctrine://default` DSN (this uses the `default` Doctrine entity manager) * [BC BREAK] The `getConnectionConfiguration` method on Amqp's `Connection` has been removed. + * [BC BREAK] A `HandlerFailedException` exception will be thrown if one or more handler fails. 4.2.0 ----- diff --git a/src/Symfony/Component/Messenger/Exception/HandlerFailedException.php b/src/Symfony/Component/Messenger/Exception/HandlerFailedException.php new file mode 100644 index 0000000000000..d0abb7cd4f156 --- /dev/null +++ b/src/Symfony/Component/Messenger/Exception/HandlerFailedException.php @@ -0,0 +1,52 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Exception; + +use Symfony\Component\Messenger\Envelope; + +class HandlerFailedException extends RuntimeException +{ + private $exceptions; + private $envelope; + + /** + * @param \Throwable[] $exceptions + */ + public function __construct(Envelope $envelope, array $exceptions) + { + $firstFailure = current($exceptions); + + parent::__construct( + 1 === \count($exceptions) + ? $firstFailure->getMessage() + : sprintf('%d handlers failed. First failure is: "%s"', \count($exceptions), $firstFailure->getMessage()), + $firstFailure->getCode(), + $firstFailure + ); + + $this->envelope = $envelope; + $this->exceptions = $exceptions; + } + + public function getEnvelope(): Envelope + { + return $this->envelope; + } + + /** + * @return \Throwable[] + */ + public function getNestedExceptions(): array + { + return $this->exceptions; + } +} diff --git a/src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php b/src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php index ed47ac6389128..4fc8f6dfcfdd0 100644 --- a/src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php +++ b/src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php @@ -14,6 +14,7 @@ use Psr\Log\LoggerAwareTrait; use Psr\Log\NullLogger; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\HandlerFailedException; use Symfony\Component\Messenger\Exception\NoHandlerForMessageException; use Symfony\Component\Messenger\Handler\HandlersLocatorInterface; use Symfony\Component\Messenger\Stamp\HandledStamp; @@ -52,10 +53,21 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope 'class' => \get_class($message), ]; + $exceptions = []; foreach ($this->handlersLocator->getHandlers($envelope) as $alias => $handler) { - $handledStamp = HandledStamp::fromCallable($handler, $handler($message), \is_string($alias) ? $alias : null); - $envelope = $envelope->with($handledStamp); - $this->logger->info('Message "{class}" handled by "{handler}"', $context + ['handler' => $handledStamp->getCallableName()]); + $alias = \is_string($alias) ? $alias : null; + + if ($this->messageHasAlreadyBeenHandled($envelope, $handler, $alias)) { + continue; + } + + try { + $handledStamp = HandledStamp::fromCallable($handler, $handler($message), $alias); + $envelope = $envelope->with($handledStamp); + $this->logger->info('Message "{class}" handled by "{handler}"', $context + ['handler' => $handledStamp->getCallableName()]); + } catch (\Throwable $e) { + $exceptions[] = $e; + } } if (null === $handler) { @@ -66,6 +78,21 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope $this->logger->info('No handler for message "{class}"', $context); } + if (\count($exceptions)) { + throw new HandlerFailedException($envelope, $exceptions); + } + return $stack->next()->handle($envelope, $stack); } + + private function messageHasAlreadyBeenHandled(Envelope $envelope, callable $handler, ?string $alias): bool + { + $some = array_filter($envelope + ->all(HandledStamp::class), function (HandledStamp $stamp) use ($handler, $alias) { + return $stamp->getCallableName() === HandledStamp::getNameFromCallable($handler) && + $stamp->getHandlerAlias() === $alias; + }); + + return \count($some) > 0; + } } diff --git a/src/Symfony/Component/Messenger/Stamp/HandledStamp.php b/src/Symfony/Component/Messenger/Stamp/HandledStamp.php index 0cd480765ecb8..491aa64472ebd 100644 --- a/src/Symfony/Component/Messenger/Stamp/HandledStamp.php +++ b/src/Symfony/Component/Messenger/Stamp/HandledStamp.php @@ -40,33 +40,38 @@ public function __construct($result, string $callableName, string $handlerAlias /** * @param mixed $result The returned value of the message handler */ - public static function fromCallable(callable $handler, $result, string $handlerAlias = null): self + public static function fromCallable(callable $handler, $result, ?string $handlerAlias = null): self + { + return new self($result, self::getNameFromCallable($handler), $handlerAlias); + } + + public static function getNameFromCallable(callable $handler): string { if (\is_array($handler)) { if (\is_object($handler[0])) { - return new self($result, \get_class($handler[0]).'::'.$handler[1], $handlerAlias); + return \get_class($handler[0]).'::'.$handler[1]; } - return new self($result, $handler[0].'::'.$handler[1], $handlerAlias); + return $handler[0].'::'.$handler[1]; } if (\is_string($handler)) { - return new self($result, $handler, $handlerAlias); + return $handler; } if ($handler instanceof \Closure) { $r = new \ReflectionFunction($handler); if (false !== strpos($r->name, '{closure}')) { - return new self($result, 'Closure', $handlerAlias); + return 'Closure'; } if ($class = $r->getClosureScopeClass()) { - return new self($result, $class->name.'::'.$r->name, $handlerAlias); + return $class->name.'::'.$r->name; } - return new self($result, $r->name, $handlerAlias); + return $r->name; } - return new self($result, \get_class($handler).'::__invoke', $handlerAlias); + return \get_class($handler).'::__invoke'; } /** diff --git a/src/Symfony/Component/Messenger/Tests/Fixtures/DummyMessageHandlerFailingFirstTimes.php b/src/Symfony/Component/Messenger/Tests/Fixtures/DummyMessageHandlerFailingFirstTimes.php new file mode 100644 index 0000000000000..2e9744538473c --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Fixtures/DummyMessageHandlerFailingFirstTimes.php @@ -0,0 +1,39 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Fixtures; + +class DummyMessageHandlerFailingFirstTimes +{ + private $remainingFailures; + + private $called = 0; + + public function __construct(int $throwExceptionOnFirstTries = 0) + { + $this->remainingFailures = $throwExceptionOnFirstTries; + } + + public function __invoke(DummyMessage $message) + { + if ($this->remainingFailures > 0) { + --$this->remainingFailures; + throw new \Exception('Handler should throw Exception.'); + } + + ++$this->called; + } + + public function getTimesCalledWithoutThrowing(): int + { + return $this->called; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Middleware/HandleMessageMiddlewareTest.php b/src/Symfony/Component/Messenger/Tests/Middleware/HandleMessageMiddlewareTest.php index 953124fb9b77d..cd15957f1b0f2 100644 --- a/src/Symfony/Component/Messenger/Tests/Middleware/HandleMessageMiddlewareTest.php +++ b/src/Symfony/Component/Messenger/Tests/Middleware/HandleMessageMiddlewareTest.php @@ -12,6 +12,7 @@ namespace Symfony\Component\Messenger\Tests\Middleware; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\HandlerFailedException; use Symfony\Component\Messenger\Handler\HandlersLocator; use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; use Symfony\Component\Messenger\Middleware\StackMiddleware; @@ -40,7 +41,7 @@ public function testItCallsTheHandlerAndNextMiddleware() /** * @dataProvider itAddsHandledStampsProvider */ - public function testItAddsHandledStamps(array $handlers, array $expectedStamps) + public function testItAddsHandledStamps(array $handlers, array $expectedStamps, bool $nextIsCalled) { $message = new DummyMessage('Hey'); $envelope = new Envelope($message); @@ -49,7 +50,11 @@ public function testItAddsHandledStamps(array $handlers, array $expectedStamps) DummyMessage::class => $handlers, ])); - $envelope = $middleware->handle($envelope, $this->getStackMock()); + try { + $envelope = $middleware->handle($envelope, $this->getStackMock($nextIsCalled)); + } catch (HandlerFailedException $e) { + $envelope = $e->getEnvelope(); + } $this->assertEquals($expectedStamps, $envelope->all(HandledStamp::class)); } @@ -64,17 +69,22 @@ public function itAddsHandledStampsProvider() $second->method('__invoke')->willReturn(null); $secondClass = \get_class($second); + $failing = $this->createPartialMock(\stdClass::class, ['__invoke']); + $failing->method('__invoke')->will($this->throwException(new \Exception('handler failed.'))); + yield 'A stamp is added' => [ [$first], [new HandledStamp('first result', $firstClass.'::__invoke')], + true, ]; yield 'A stamp is added per handler' => [ - [$first, $second], + ['first' => $first, 'second' => $second], [ - new HandledStamp('first result', $firstClass.'::__invoke'), - new HandledStamp(null, $secondClass.'::__invoke'), + new HandledStamp('first result', $firstClass.'::__invoke', 'first'), + new HandledStamp(null, $secondClass.'::__invoke', 'second'), ], + true, ]; yield 'Yielded locator alias is used' => [ @@ -83,6 +93,24 @@ public function itAddsHandledStampsProvider() new HandledStamp('first result', $firstClass.'::__invoke', 'first_alias'), new HandledStamp(null, $secondClass.'::__invoke'), ], + true, + ]; + + yield 'It tries all handlers' => [ + ['first' => $first, 'failing' => $failing, 'second' => $second], + [ + new HandledStamp('first result', $firstClass.'::__invoke', 'first'), + new HandledStamp(null, $secondClass.'::__invoke', 'second'), + ], + false, + ]; + + yield 'It ignores duplicated handler' => [ + [$first, $first], + [ + new HandledStamp('first result', $firstClass.'::__invoke'), + ], + true, ]; } diff --git a/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php new file mode 100644 index 0000000000000..00346bbc703c2 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php @@ -0,0 +1,63 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Handler\HandlersLocator; +use Symfony\Component\Messenger\MessageBus; +use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; +use Symfony\Component\Messenger\Middleware\SendMessageMiddleware; +use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy; +use Symfony\Component\Messenger\Stamp\SentStamp; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessageHandlerFailingFirstTimes; +use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; +use Symfony\Component\Messenger\Transport\Sender\SendersLocator; +use Symfony\Component\Messenger\Worker; + +class RetryIntegrationTest extends TestCase +{ + public function testRetryMechanism() + { + $apiMessage = new DummyMessage('API'); + + $receiver = $this->createMock(ReceiverInterface::class); + $receiver->method('get') + ->willReturn([ + new Envelope($apiMessage, [ + new SentStamp('Some\Sender', 'sender_alias'), + ]), + ]); + + $senderLocator = new SendersLocator([], ['*' => true]); + + $handler = new DummyMessageHandlerFailingFirstTimes(); + $throwingHandler = new DummyMessageHandlerFailingFirstTimes(1); + $handlerLocator = new HandlersLocator([ + DummyMessage::class => [ + 'handler' => $handler, + 'throwing' => $throwingHandler, + ], + ]); + + $bus = new MessageBus([new SendMessageMiddleware($senderLocator), new HandleMessageMiddleware($handlerLocator)]); + + $worker = new Worker(['receiverName' => $receiver], $bus, ['receiverName' => new MultiplierRetryStrategy()]); + $worker->run([], function () use ($worker) { + $worker->stop(); + }); + + $this->assertSame(1, $handler->getTimesCalledWithoutThrowing()); + $this->assertSame(1, $throwingHandler->getTimesCalledWithoutThrowing()); + } +} diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index 1e7f539f6ce40..39fa5a910bd8c 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -15,6 +15,7 @@ use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent; use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; +use Symfony\Component\Messenger\Exception\HandlerFailedException; use Symfony\Component\Messenger\Exception\LogicException; use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException; use Symfony\Component\Messenger\Retry\RetryStrategyInterface; @@ -123,6 +124,10 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, try { $envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp())); } catch (\Throwable $throwable) { + if ($throwable instanceof HandlerFailedException) { + $envelope = $throwable->getEnvelope(); + } + $shouldRetry = $this->shouldRetry($throwable, $envelope, $retryStrategy); $this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $receiverName, $throwable, $shouldRetry));