10000 Support for handling messages after current bus is finished · symfony/symfony@903355f · GitHub
[go: up one dir, main page]

Skip to content

Commit 903355f

Browse files
Nyholmogizanagi
andcommitted
Support for handling messages after current bus is finished
Co-authored-by: Maxime Steinhausser <ogizanagi@users.noreply.github.com>
1 parent 2ff8c19 commit 903355f

File tree

8 files changed

+368
-2
lines changed

8 files changed

+368
-2
lines changed

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1608,7 +1608,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
16081608
}
16091609

16101610
$defaultMiddleware = [
1611-
'before' => [],
1611+
'before' => [['id' => 'dispatch_after_current_bus']],
16121612
'after' => [['id' => 'send_message'], ['id' => 'handle_message']],
16131613
];
16141614
foreach ($config['buses'] as $busId => $bus) {

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
</call>
4040
</service>
4141

42+
<service id="messenger.middleware.dispatch_after_current_bus" class="Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware" />
43+
4244
<service id="messenger.middleware.validation" class="Symfony\Component\Messenger\Middleware\ValidationMiddleware">
4345
<argument type="service" id="validator" />
4446
</service>

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,12 +702,14 @@ public function testMessengerWithMultipleBuses()
702702
$this->assertTrue($container->has('messenger.bus.commands'));
703703
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
704704
$this->assertEquals([
705+
['id' => 'dispatch_after_current_bus'],
705706
['id' => 'send_message'],
706707
['id' => 'handle_message'],
707708
], $container->getParameter('messenger.bus.commands.middleware'));
708709
$this->assertTrue($container->has('messenger.bus.events'));
709710
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
710711
$this->assertEquals([
712+
['id' => 'dispatch_after_current_bus'],
711713
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
712714
['id' => 'send_message'],
713715
['id' => 'handle_message'],

src/Symfony/Bundle/FrameworkBundle/composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
"symfony/form": "^4.3",
4444
"symfony/expression-language": "~3.4|~4.0",
4545
"symfony/http-client": "^4.3",
46-
"symfony/messenger": "^4.2",
46+
"symfony/messenger": "^4.3",
4747
"symfony/mime": "^4.3",
4848
"symfony/process": "~3.4|~4.0",
4949
"symfony/security-core": "~3.4|~4.0",
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Exception;
13+
14+
/**
15+
* When handling queued messages from {@link DispatchAfterCurrentBusMiddleware},
16+
* some handlers caused an exception. This exception contains all those handler exceptions.
17+
*
18+
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
19+
*/
20+
class DelayedMessageHandlingException extends \RuntimeException implements ExceptionInterface
21+
{
22+
private $exceptions;
23+
24+
public function __construct(array $exceptions)
25+
{
26+
$exceptionMessages = implode(", \n", array_map(
27+
function (\Throwable $e) {
28+
return \get_class($e).': '.$e->getMessage();
29+
},
30+
$exceptions
31+
));
32+
33+
if (1 === \count($exceptions)) {
34+
$message = sprintf("A delayed message handler threw an exception: \n\n%s", $exceptionMessages);
35+
} else {
36+
$message = sprintf("Some delayed message handlers threw an exception: \n\n%s", $exceptionMessages);
37+
}
38+
39+
$this->exceptions = $exceptions;
40+
41+
parent::__construct($message, 0, $exceptions[0]);
42+
}
43+
44+
public function getExceptions(): array
45+
{
46+
return $this->exceptions;
47+
}
48+
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Middleware;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Exception\DelayedMessageHandlingException;
16+
use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;
17+
18+
/**
19+
* Allow to configure messages to be handled after the current bus is finished.
20+
*
21+
* I.e, messages dispatched from a handler with a DispatchAfterCurrentBus stamp
22+
* will actually be handled once the current message being dispatched is fully
23+
* handled.
24+
*
25+
* For instance, using this middleware before the DoctrineTransactionMiddleware
26+
* means sub-dispatched messages with a DispatchAfterCurrentBus stamp would be
27+
* handled after the Doctrine transaction has been committed.
28+
*
29+
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
30+
*/
31+
class DispatchAfterCurrentBusMiddleware implements MiddlewareInterface
32+
{
33+
/**
34+
* @var QueuedEnvelope[] A queue of messages and next middleware
35+
*/
36+
private $queue = [];
37+
38+
/**
39+
* @var bool this property is used to signal if we are inside a the first/root call to
40+
* MessageBusInterface::dispatch() or if dispatch has been called inside a message handler
41+
*/
42+
private $isRootDispatchCallRunning = false;
43+
44+
public function handle(Envelope $envelope, StackInterface $stack): Envelope
45+
{
46+
if (null !== $envelope->last(DispatchAfterCurrentBusStamp::class)) {
47+
if (!$this->isRootDispatchCallRunning) {
48+
throw new \LogicException(sprintf('You can only use a "%s" stamp in the context of a message handler.', DispatchAfterCurrentBusStamp::class));
49+
}
50+
$this->queue[] = new QueuedEnvelope($envelope, $stack);
51+
52+
return $envelope;
53+
}
54+
55+
if ($this->isRootDispatchCallRunning) {
56+
/*
57+
* A call to MessageBusInterface::dispatch() was made from inside the main bus handling,
58+
* but the message does not have the stamp. So, process it like normal.
59+
*/
60+
return $stack->next()->handle($envelope, $stack);
61+
}
62+
63+
// First time we get here, mark as inside a "root dispatch" call:
64+
$this->isRootDispatchCallRunning = true;
65+
try {
66+
// Execute the whole middleware stack & message handling for main dispatch:
67+
$returnedEnvelope = $stack->next()->handle($envelope, $stack);
68+
} catch (\Throwable $exception) {
69+
/*
70+
* Whenever an exception occurs while handling a message that has
71+
* queued other messages, we drop the queued ones.
72+
* This is intentional since the queued commands were likely dependent
73+
* on the preceding command.
74+
*/
75+
$this->queue = [];
76+
$this->isRootDispatchCallRunning = false;
77+
78+
throw $exception;
79+
}
80+
81+
// "Root dispatch" call is finished, dispatch stored messages.
82+
$exceptions = [];
83+
while (null !== $queueItem = array_shift($this->queue)) {
84+
try {
85+
// Execute the stored messages
86+
$queueItem->getStack()->next()->handle($queueItem->getEnvelope(), $queueItem->getStack());
87+
} catch (\Exception $exception) {
88+
// Gather all exceptions
89+
$exceptions[] = $exception;
90+
}
91+
}
92+
93+
$this->isRootDispatchCallRunning = false;
94+
if (\count($exceptions) > 0) {
95+
throw new DelayedMessageHandlingException($exceptions);
96+
}
97+
98+
return $returnedEnvelope;
99+
}
100+
}
101+
102+
/**
103+
* @internal
104+
*/
105+
final class QueuedEnvelope
106+
{
107+
/** @var Envelope */
108+
private $envelope;
109+
110+
/** @var StackInterface */
111+
private $stack;
112+
113+
public function __construct(Envelope $envelope, StackInterface $stack)
114+
{
115+
$this->envelope = $envelope;
116+
$this->stack = $stack;
117+
}
118+
119+
public function getEnvelope(): Envelope
120+
{
121+
return $this->envelope;
122+
}
123+
124+
public function getStack(): StackInterface
125+
{
126+
return $this->stack;
127+
}
128+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
declare(strict_types=1);
13+
14+
namespace Symfony\Component\Messenger\Stamp;
15+
16+
/**
17+
* Marker item to tell this message should be handled in after the current bus has finished.
18+
*
19+
* @see \Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware
20+
*
21+
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
22+
*/
23+
class DispatchAfterCurrentBusStamp implements StampInterface
24+
{
25+
}

0 commit comments

Comments
 (0)
0