10000 Adding global retry support, events & more to messenger transport · symfony/symfony@a989384 · GitHub
[go: up one dir, main page]

Skip to content

Commit a989384

Browse files
weaverryansroze
andcommitted
Adding global retry support, events & more to messenger transport
Co-authored-by: Samuel ROZE <samuel.roze@gmail.com>
1 parent 59f20ad commit a989384

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1603
-229
lines changed

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,6 +1111,20 @@ function ($a) {
11111111
->prototype('variable')
11121112
->end()
11131113
->end()
1114+
->arrayNode('retry_strategy')
1115+
->addDefaultsIfNotSet()
1116+
->validate()
1117+
->ifTrue(function ($v) { return null !== $v['service'] && (isset($v['max_retries']) || isset($v['delay']) || isset($v['multiplier']) || isset($v['max_delay'])); })
1118+
->thenInvalid('"service" cannot be used along with the other retry_strategy options.')
1119+
->end()
1120+
->children()
1121+
->scalarNode('service')->defaultNull()->info('Service id to override the retry strategy entirely')->end()
1122+
->integerNode('max_retries')->defaultValue(3)->min(0)->end()
1123+
->integerNode('delay')->defaultValue(1000)->min(0)->info('Time in ms to delay (or the initial value when multiplier is used)')->end()
1124+
->floatNode('multiplier')->defaultValue(2)->min(1)->info('If greater than 1, delay will grow exponentially for each retry: this delay = (delay * (multiple ^ retries))')->end()
1125+
->integerNode('max_delay')->defaultValue(0)->min(0)->info('Max time in ms that a retry should ever be delayed (0 = infinite)')->end()
1126+
->end()
1127+
->end()
11141128
->end()
11151129
->end()
11161130
->end()

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1653,6 +1653,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
16531653
}
16541654

16551655
$senderAliases = [];
1656+
$transportRetryReferences = [];
16561657
foreach ($config['transports'] as $name => $transport) {
16571658
if (0 === strpos($transport['dsn'], 'amqp://') && !$container->hasDefinition('messenger.transport.amqp.factory')) {
16581659
throw new LogicException('The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enabling it or running "composer require symfony/serializer-pack".');
@@ -1665,6 +1666,21 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
16651666
;
16661667
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
16671668
$senderAliases[$name] = $transportId;
1669+
1670+
if (null !== $transport['retry_strategy']['service']) {
1671+
$transportRetryReferences[$name] = new Reference($transport['retry_strategy']['service']);
1672+
} else {
1673+
$retryServiceId = sprintf('messenger.retry.multiplier_retry_strategy.%s', $name);
1674+
$retryDefinition = new ChildDefinition('messenger.retry.abstract_multiplier_retry_strategy');
1675+
$retryDefinition
1676+
->replaceArgument(0, $transport['retry_strategy']['max_retries'])
1677+
->replaceArgument(1, $transport['retry_strategy']['delay'])
1678+
->replaceArgument(2, $transport['retry_strategy']['multiplier'])
1679+
->replaceArgument(3, $transport['retry_strategy']['max_delay']);
1680+
$container->setDefinition($retryServiceId, $retryDefinition);
1681+
1682+
$transportRetryReferences[$name] = new Reference($retryServiceId);
1683+
}
16681684
}
16691685

16701686
$messageToSendersMapping = [];
@@ -1686,6 +1702,9 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
16861702
->replaceArgument(0, $messageToSendersMapping)
16871703
->replaceArgument(1, $messagesToSendAndHandle)
16881704
;
1705+
1706+
$container->getDefinition('messenger.retry_strategy_locator')
1707+
->replaceArgument(0, $transportRetryReferences);
16891708
}
16901709

16911710
private function registerCacheConfiguration(array $config, ContainerBuilder $container)

src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,12 @@
8282
<argument type="service" id="logger" on-invalid="null" />
8383
<argument type="collection" /> <!-- Receiver names -->
8484
<argument type="collection" /> <!-- Message bus names -->
85+
<argument type="service" id="messenger.retry_strategy_locator" />
86+
<argument type="service" id="event_dispatcher" />
8587

88+
<tag name="console.command" command="messenger:consume" />
8689
<tag name="console.command" command="messenger:consume-messages" />
90+
<tag name="monolog.logger" channel="messenger" />
8791
</service>
8892

8993
<service id="console.command.messenger_debug" class="Symfony\Component\Messenger\Command\DebugCommand">

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,5 +64,18 @@
6464
<tag name="messenger.transport_factory" />
6565
<argument type="service" id="messenger.transport.serializer" />
6666
</service>
67+
68+
<!-- retry -->
69+
<service id="messenger.retry_strategy_locator">
70+
<tag name="container.service_locator" />
71+
<argument type="collection" />
72+
</service>
73+
74+
<service id="messenger.retry.abstract_multiplier_retry_strategy" class="Symfony\Component\Messenger\Retry\MultiplierRetryStrategy" abstract="true">
75+
<argument /> <!-- max retries -->
76+
<argument /> <!-- delay ms -->
77+
<argument /> <!-- multiplier -->
78+
<argument /> <!-- max delay ms -->
79+
</service>
6780
</services>
6881
</container>

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,34 @@ CHANGELOG
33

44
4.3.0
55
-----
6-
6+
7+
* [BC BREAK] 2 new methods were added to `ReceiverInterface`:
8+
`ack()` and `reject()`.
9+
* [BC BREAK] Error handling was moved from the receivers into
10+
`Worker`. Implementations of `ReceiverInterface::handle()`
11+
should now allow all exceptions to be thrown, except for transport
12+
exceptions. They should also not retry (e.g. if there's a queue,
13+
remove from the queue) if there is a problem decoding the message.
14+
* [BC BREAK] `RejectMessageExceptionInterface` was removed and replaced
15+
by `Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException`,
16+
which has the same behavior: a message will not be retried
17+
* The default command name for `ConsumeMessagesCommand` was
18+
changed from `messenger:consume-messages` to `messenger:consume`
19+
* `ConsumeMessagesCommand` has two new optional constructor arguments
20+
* `Worker` has 4 new option constructor arguments.
21+
* The `Worker` class now handles calling `pcntl_signal_dispatch()` the
22+
receiver no longer needs to call this.
23+
* The `AmqpSender` will now retry messages using a dead-letter exchange
24+
and delayed queues, instead of retrying via `nack()`
25+
* Senders now receive the `Envelope` with the `SentStamp` on it. Previously,
26+
the `Envelope` was passed to the sender and *then* the `SentStamp`
27+
was added.
28+
* `SerializerInterface` implementations should now throw a
29+
`Symfony\Component\Messenger\Exception\MessageDecodingFailedException`
30+
if `decode()` fails for any reason.
31+
* [BC BREAK] The default `Serializer` will now throw a
32+
`MessageDecodingFailedException` if `decode()` fails, instead
33+
of the underlying exceptions from the Serializer component.
734
* Added `PhpSerializer` which uses PHP's native `serialize()` and
835
`unserialize()` to serialize messages to a transport
936
* [BC BREAK] If no serializer were passed, the default serializer

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
use Symfony\Component\Console\Input\InputOption;
2121
use Symfony\Component\Console\Output\OutputInterface;
2222
use Symfony\Component\Console\Style\SymfonyStyle;
23+
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
2324
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
2425
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
2526
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
@@ -32,21 +33,25 @@
3233
*/
3334
class ConsumeMessagesCommand extends Command
3435
{
35-
protected static $defaultName = 'messenger:consume-messages';
36+
protected static $defaultName = 'messenger:consume';
3637

3738
private $busLocator;
3839
private $receiverLocator;
3940
private $logger;
4041
private $receiverNames;
4142
private $busNames;
43+
private $retryStrategyLocator;
44+
private $eventDispatcher;
4245

43-
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], array $busNames = [])
46+
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], array $busNames = [], ContainerInterface $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
4447
{
4548
$this->busLocator = $busLocator;
4649
$this->receiverLocator = $receiverLocator;
4750
$this->logger = $logger;
4851
$this->receiverNames = $receiverNames;
4952
$this->busNames = $busNames;
53+
$this->retryStrategyLocator = $retryStrategyLocator;
54+
$this->eventDispatcher = $eventDispatcher;
5055

5156
parent::__construct();
5257
}
@@ -132,6 +137,12 @@ protected function interact(InputInterface $input, OutputInterface $output)
132137
*/
133138
protected function execute(InputInterface $input, OutputInterface $output): void
134139
{
140+
if (false !== strpos($input->getFirstArgument(), ':consume-')) {
141+
$message = 'The use of the "messenger:consume-messages" command is deprecated since version 4.3 and will be removed in 5.0. Use "messenger:consume" instead.';
142+
@trigger_error($message, E_USER_DEPRECATED);
143+
$output->writeln(sprintf('<comment>%s</comment>', $message));
144+
}
145+
135146
if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
136147
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
137148
}
@@ -140,8 +151,13 @@ protected function execute(InputInterface $input, OutputInterface $output): void
140151
throw new RuntimeException(sprintf('Bus "%s" does not exist.', $busName));
141152
}
142153

154+
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
155+
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
156+
}
157+
143158
$receiver = $this->receiverLocator->get($receiverName);
144159
$bus = $this->busLocator->get($busName);
160+
$retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
145161

146162
$stopsWhen = [];
147163
if ($limit = $input->getOption('limit')) {
@@ -174,7 +190,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void
174190
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
175191
}
176192

177-
$worker = new Worker($receiver, $bus);
193+
$worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger);
178194
$worker->run();
179195
}
180196

src/Symfony/Component/Messenger/Envelope.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,18 @@ public function with(StampInterface ...$stamps): self
5454
return $cloned;
5555
}
5656

57+
/**
58+
* @return Envelope a new Envelope instance without any stamps of the given class
59+
*/
60+
public function withoutAll(string $stampFqcn): self
61+
{
62+
$cloned = clone $this;
63+
64+
unset($cloned->stamps[$stampFqcn]);
65+
66+
return $cloned;
67+
}
68+
5769
public function last(string $stampFqcn): ?StampInterface
5870
{
5971
return isset($this->stamps[$stampFqcn]) ? end($this->stamps[$stampFqcn]) : null;
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Event;
13+
14+
use Symfony\Component\EventDispatcher\Event;
15+
use Symfony\Component\Messenger\Envelope;
16+
17+
/**
18+
* @experimental in 4.3
19+
*/
20+
abstract class AbstractWorkerMessageEvent extends Event
21+
{
22+
private $envelope;
23+
private $receiverName;
24+
25+
public function __construct(Envelope $envelope, string $receiverName)
26+
{
27+
$this->envelope = $envelope;
28+
$this->receiverName = $receiverName;
29+
}
30+
31+
public function getEnvelope(): Envelope
32+
{
33+
return $this->envelope;
34+
}
35+
36+
/**
37+
* Returns a unique identifier for transport receiver this message was received from.
38+
*/
39+
public function getReceiverName(): string
40+
{
41+
return $this->receiverName;
42+
}
43+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Event;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
16+
/**
17+
* Dispatched when a message was received from a transport and handling failed.
18+
*
19+
* The event name is the class name.
20+
*
21+
* @experimental in 4.3
22+
*/
23+
class WorkerMessageFailedEvent extends AbstractWorkerMessageEvent
24+
{
25+
private $throwable;
26+
private $willRetry;
27+
28+
public function __construct(Envelope $envelope, string $receiverName, \Throwable $error, bool $willRetry)
29+
{
30+
$this->throwable = $error;
31+
$this->willRetry = $willRetry;
32+
33+
parent::__construct($envelope, $receiverName);
34+
}
35+
36+
public function getThrowable(): \Throwable
37+
{
38+
return $this->throwable;
39+
}
40+
41+
public function willRetry(): bool
42+
{
43+
return $this->willRetry;
44+
}
45+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Event;
13+
14+
/**
15+
* Dispatched after a message was received from a transport and successfully handled.
16+
*
17+
* The event name is the class name.
18+
*
19+
* @experimental in 4.3
20+
*/
21+
class WorkerMessageHandledEvent extends AbstractWorkerMessageEvent
22+
{
23+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Event;
13+
14+
/**
15+
* Dispatched when a message was received from a transport but before sent to the bus.
16+
*
17+
* The event name is the class name.
18+
*
19+
* @experimental in 4.3
20+
*/
21+
class WorkerMessageReceivedEvent extends AbstractWorkerMessageEvent
22+
{
23+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Exception;
13+
14+
/**
15+
* Thrown when a message cannot be decoded in a serializer.
16+
*
17+
* @experimental in 4.3
18+
*/
19+
class MessageDecodingFailedException extends \InvalidArgumentException implements ExceptionInterface
20+
{
21+
}

0 commit comments

Comments
 (0)
0