8000 Adding global retry support, events & more to messenger transport · symfony/symfony@a989384 · GitHub
[go: up one dir, main page]

Skip to content

Commit a989384

Browse files
weaverryansroze
andcommitted
Adding global retry support, events & more to messenger transport
Co-authored-by: Samuel ROZE <samuel.roze@gmail.com>
1 parent 59f20ad commit a989384
Copy full SHA for a989384

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1603
-229
lines changed

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,6 +1111,20 @@ function ($a) {
11111111
->prototype('variable')
11121112
->end()
11131113
->end()
1114+
->arrayNode('retry_strategy')
1115+
->addDefaultsIfNotSet()
1116+
->validate()
1117+
->ifTrue(function ($v) { return null !== $v['service'] && (isset($v['max_retries']) || isset($v['delay']) || isset($v['multiplier']) || isset($v['max_delay'])); })
1118+
->thenInvalid('"service" cannot be used along with the other retry_strategy options.')
1119+
->end()
1120+
->children()
1121+
->scalarNode('service')->defaultNull()->info('Service id to override the retry strategy entirely')->end()
1122+
->integerNode('max_retries')->defaultValue(3)->min(0)->end()
1123+
->integerNode('delay')->defaultValue(1000)->min(0)->info('Time in ms to delay (or the initial value when multiplier is used)')->end()
1124+
->floatNode('multiplier')->defaultValue(2)->min(1)->info('If greater than 1, delay will grow exponentially for each retry: this delay = (delay * (multiple ^ retries))')->end()
1125+
->integerNode('max_delay')->defaultValue(0)->min(0)->info('Max time in ms that a retry should ever be delayed (0 = infinite)')->end()
1126+
->end()
1127+
->end()
11141128
->end()
11151129
->end()
11161130
->end()

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1653,6 +1653,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
16531653
}
16541654

16551655
$senderAliases = [];
1656+
$transportRetryReferences = [];
16561657
foreach ($config['transports'] as $name => $transport) {
16571658
if (0 === strpos($transport['dsn'], 'amqp://') && !$container->hasDefinition('messenger.transport.amqp.factory')) {
16581659
throw new LogicException('The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enabling it or running "composer require symfony/serializer-pack".');
@@ -1665,6 +1666,21 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
16651666
;
16661667
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
16671668
$senderAliases[$name] = $transportId;
1669+
1670+
if (null !== $transport['retry_strategy']['service']) {
1671+
$transportRetryReferences[$name] = new Reference($transport['retry_strategy']['service']);
1672+
} else {
1673+
$retryServiceId = sprintf('messenger.retry.multiplier_retry_strategy.%s', $name);
1674+
$retryDefinition = new ChildDefinition('messenger.retry.abstract_multiplier_retry_strategy');
1675+
$retryDefinition
1676+
->replaceArgument(0, $transport['retry_strategy']['max_retries'])
1677+
->replaceArgument(1, $transport['retry_strategy']['delay'])
1678+
->replaceArgument(2, $transport['retry_strategy']['multiplier'])
1679+
->replaceArgument(3, $transport['retry_strategy']['max_delay']);
1680+
$container->setDefinition($retryServiceId, $retryDefinition);
1681+
1682+
$transportRetryReferences[$name] = new Reference($retryServiceId);
1683+
}
16681684
}
16691685

16701686
$messageToSendersMapping = [];
@@ -1686,6 +1702,9 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
16861702
->replaceArgument(0, $messageToSendersMapping)
16871703
->replaceArgument(1, $messagesToSendAndHandle)
16881704
;
1705+
1706+
$container->getDefinition('messenger.retry_strategy_locator')
1707+
->replaceArgument(0, $transportRetryReferences);
16891708
}
16901709

16911710
private function registerCacheConfiguration(array $config, ContainerBuilder $container)

src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,12 @@
8282
<argument type="service" id="logger" on-invalid="null" />
8383
<argument type="collection" /> <!-- Receiver names -->
8484
<argument type="collection" /> <!-- Message bus names -->
85+
<argument type="service" id="messenger.retry_strategy_locator" />
86+
<argument type="service" id="event_dispatcher" />
8587

88+
<tag name="console.command" command="messenger:consume" />
8689
<tag name="console.command" command="messenger:consume-messages" />
90+
<tag name="monolog.logger" channel="messenger" />
8791
</service>
8892

8993
<service id="console.command.messenger_debug" class="Symfony\Component\Messenger\Command\DebugCommand">

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,5 +64,18 @@
6464
<tag name="messenger.transport_factory" />
6565
<argument type="service" id="messenger.transport.serializer" />
6666
</service>
67+
68+
<!-- retry -->
69+
<service id="messenger.retry_strategy_locator">
70+
<tag name="container.service_locator" />
71+
<argument type="collection" />
72+
</service>
73+
74+
<service id="messenger.retry.abstract_multiplier_retry_strategy" class="Symfony\Component\Messenger\Retry\MultiplierRetryStrategy" abstract="true">
75+
<argument /> <!-- max retries -->
76+
<argument /> <!-- delay ms -->
77+
<argument /> <!-- multiplier -->
78+
<argument /> <!-- max delay ms -->
79+
</service>
6780
</services>
6881
</container>

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,34 @@ CHANGELOG
33

44
4.3.0
55
-----
6-
6+
7+
* [BC BREAK] 2 new methods were added to `ReceiverInterface`:
8+
`ack()` and `reject()`.
9+
* [BC BREAK] Error handling was moved from the receivers into
10+
`Worker`. Implementations of `ReceiverInterface::handle()`
11+
should now allow all exceptions to be thrown, except for transport
12+
exceptions. They should also not retry (e.g. if there's a queue,
13+
remove from the queue) if there is a problem decoding the message.
14+
* [BC BREAK] `RejectMessageExceptionInterface` was removed and replaced
15+
by `Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException`,
16+
which has the same behavior: a message will not be retried
17+
* The default command name for `ConsumeMessagesCommand` was
18+
changed from `messenger:consume-messages` to `messenger:consume`
19+
* `ConsumeMessagesCommand` has two new optional constructor arguments
20+
* `Worker` has 4 new option constructor arguments.
21+
* The `Worker` class now handles calling `pcntl_signal_dispatch()` the
22+
receiver no longer needs to call this.
23+
* The `AmqpSender` will now retry messages using a dead-letter exchange
24+
and delayed queues, instead of retrying via `nack()`
25+
* Senders now receive the `Envelope` with the `SentStamp` on it. Previously,
26+
the `Envelope` was passed to the sender and *then* the `SentStamp`
27+
was added.
28+
* `SerializerInterface` implementations should now throw a
29+
`Symfony\Component\Messenger\Exception\MessageDecodingFailedException`
30+
if `decode()` fails for any reason.
31+
* [BC BREAK] The default `Serializer` will now throw a
32+
`MessageDecodingFailedException` if `decode()` fails, instead
33+
of the underlying exceptions from the Serializer component.
734
* Added `PhpSerializer` which uses PHP's native `serialize()` and
835
`unserialize()` to serialize messages to a transport
936
* [BC BREAK] If no serializer were passed, the default serializer

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
use Symfony\Component\Console\Input\InputOption;
2121
use Symfony\Component\Console\Output\OutputInterface;
2222
use Symfony\Component\Console\Style\SymfonyStyle;
23+
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
2324
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
2425
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
2526
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
@@ -32,21 +33,25 @@
3233
*/
3334
class ConsumeMessagesCommand extends Command
3435
{
35-
protected static $defaultName = 'messenger:consume-messages';
36+
protected static $defaultName = 'messenger:consume';
3637

3738
private $busLocator;
3839
private $receiverLocator;
3940
private $logger;
4041
private $receiverNames;
4142
private $busNames;
43+
private $retryStrategyLocator;
44+
private $eventDispatcher;
4245

43-
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], array $busNames = [])
46+
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], array $busNames = [], ContainerInterface $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
4447
{
4548
$this->busLocator = $busLocator;
4649
$this->receiverLocator = $receiverLocator;
4750
$this->logger = $logger;
4851
$this->receiverNames = $receiverNames;
4952
$this->busNames = $busNames;
53+
$this->retryStrategyLocator = $retryStrategyLocator;
54+
$this->eventDispatcher = $eventDispatcher;
5055

5156
parent::__construct();
5257
}
@@ -132,6 +137,12 @@ protected function interact(InputInterface $input, OutputInterface $output)
132137
*/
133138
protected function execute(InputInterface $input, OutputInterface $output): void
134139
{
140+
if (false !== strpos($input->getFirstArgument(), ':consume-')) {
141+
$message = 'The use of the "messenger:consume-messages" command is deprecated since version 4.3 and will be removed in 5.0. Use "messenger:consume" instead.';
142+
@trigger_error($message, E_USER_DEPRECATED);
143+
$output->writeln(sprintf('<comment>%s</comment>', $message));
144+
}
145+
135146
if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
136147
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
137148
}
@@ -140,8 +151,13 @@ protected function execute(InputInterface $input, OutputInterface $output): void
140151
throw new RuntimeException(sprintf('Bus "%s" does not exist.', $busName));
141152
}
142153

154+
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
155+
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
156+
}
157+
143158
$receiver = $this->receiverLocator->get($receiverName);
144159
$bus = $this->busLocator->get($busName);
160+
$retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
145161

146162
$stopsWhen = [];
147163
if ($limit = $input->getOption('limit')) {
@@ -174,7 +190,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void
174190
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
175191
}
176192

177-
$worker = new Worker($receiver, $bus);
193+
$worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger);
178194
$worker->run();
179195
}
180196

src/Symfony/Component/Messenger/Envelope.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,18 @@ public function with(StampInterface ...$stamps): self
5454
return $cloned;
5555
}
5656

57+
/**
58+
* @return Envelope a new Envelope instance without any stamps of the given class
59+
*/
60+
public function withoutAll(string $stampFqcn): self
61+
{
62+
$cloned = clone $this;
63+
64+
unset($cloned->stamps[$stampFqcn]);
65+
66+
return $cloned;
67+
}
68+
5769
public function last(string $stampFqcn): ?StampInterface
5870
{
5971
return isset($this->stamps[$stampFqcn]) ? end($this->stamps[$stampFqcn]) : null;
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Event;
13+
14+
use Symfony\Component\EventDispatcher\Event;
15+
use Symfony\Component\Messenger\Envelope;
16+
17+
/**
18+
* @experimental in 4.3
19+
*/
20+
abstract class AbstractWorkerMessageEvent extends Event
21+
{
22+
private $envelope;
23+
private $receiverName;
24+
25+
public function __construct(Envelope $envelope, string $receiverName)
26+
{
27+
$this->envelope = $envelope;
28+
$this->receiverName = $receiverName;
29+
}
30+
31+
public function getEnvelope(): Envelope
32+
{
33+
return $this->envelope;
34+
}
35+
36+
/**
37+
* Returns a unique identifier for transport receiver this message was received from.
38+
*/
39+
public function getReceiverName(): string
40+
{
41+
return $this->receiverName;
42+
}
43+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Event;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
16+
/**
17+
* Dispatched when a message was received from a transport and handling failed.
18+
*
19+
* The event name is the class name.
20+
*
21+
* @experimental in 4.3
22+
*/
23+
class WorkerMessageFailedEvent extends AbstractWorkerMessageEvent
24+
{
25+
private $throwable;
26+
private $willRetry;
27+
28+
public function __construct(Envelope $envelope, string $receiverName, \Throwable $error, bool $willRetry)
29+
{
30+
$this->throwable = $error;
31+
$this->willRetry = $willRetry;
32+
33+
parent::__construct($envelope, $receiverName);
34+
}
35+
36+
public function getThrowable(): \Throwable
37+
{
38+
return $this->throwable;
39+
}
40+
41+
public function willRetry(): bool
42+
{
43+
return $this->willRetry;
44+
}
45+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Event;
13+
14+
/**
15+
* Dispatched after a message was received from a transport and successfully handled.
16+
*
17+
* The event name is the class name.
18+
*
19+
* @experimental in 4.3
20+
*/
21+
class WorkerMessageHandledEvent extends AbstractWorkerMessageEvent
22+
{
23+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Event;
13+
14+
/**
15+
* Dispatched when a message was received from a transport but before sent to the bus.
16+
*
17+
* The event name is the class name.
18+
*
19+
* @experimental in 4.3
20+
*/
21+
class WorkerMessageReceivedEvent extends AbstractWorkerMessageEvent
22+
{
23+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Exception;
13+
14+
/**
15+
* Thrown when a message cannot be decoded in a serializer.
16+
*
17+
* @experimental in 4.3
18+
*/
19+
class MessageDecodingFailedException extends \InvalidArgumentException implements ExceptionInterface
20+
{
21+
}

0 commit comments

Comments
 (0)
0