8000 Continuing "Support for handling messages in different transaction" · symfony/symfony@fc3dc64 · GitHub
[go: up one dir, main page]

Skip to content

Commit fc3dc64

Browse files
committed
Continuing "Support for handling messages in different transaction"
1 parent a82ad8a commit fc3dc64

File tree

5 files changed

+245
-49
lines changed

5 files changed

+245
-49
lines changed

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
<argument /> <!-- Bus handler resolver -->
2929
</service>
3030

31-
<service id="messenger.middleware.handle_message_in_new_transaction" class="Symfony\Component\Messenger\Middleware\HandleMessageInNewTransactionMiddleware" abstract="true" />
31+
<service id="messenger.middleware.handle_message_in_new_transaction" class="Symfony\Component\Messenger\Middleware\HandleMessageInNewTransactionMiddleware" />
3232

3333
<service id="messenger.middleware.validation" class="Symfony\Component\Messenger\Middleware\ValidationMiddleware">
3434
<argument type="service" id="validator" />

src/Symfony/Component/Messenger/Exception/MessageHandlingException.php renamed to src/Symfony/Component/Messenger/Exception/QueuedMessageHandlingException.php

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,26 +12,27 @@
1212
namespace Symfony\Component\Messenger\Exception;
1313

1414
/**
15-
* When handling messages, some handlers caused an exception. This exception
16-
* contains all those handler exceptions.
15+
* When handling queued messages from {@link HandleMessageInNewTransactionMiddleware},
16+
* some handlers caused an exception. This exception contains all those handler exceptions.
1717
*
1818
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
1919
*/
20-
class MessageHandlingException extends \RuntimeException implements ExceptionInterface
20+
class QueuedMessageHandlingException extends \RuntimeException implements ExceptionInterface
2121
{
22-
private $exceptions = array();
22+
private $exceptions;
2323

2424
public function __construct(array $exceptions)
2525
{
2626
$message = sprintf(
27-
"Some handlers for recorded messages threw an exception. Their messages were: \n\n%s",
27+
"Some handlers for queued messages threw an exception: \n\n%s",
2828
implode(", \n", array_map(function (\Throwable $e) {
29-
return $e->getMessage();
29+
return \get_class($e).': '.$e->getMessage();
3030
}, $exceptions))
3131
);
3232

3333
$this->exceptions = $exceptions;
34-
parent::__construct($message);
34+
35+
parent::__construct($message, 0, 1 === \count($exceptions) ? $exceptions[0] : null);
3536
}
3637

3738
public function getExceptions(): array

src/Symfony/Component/Messenger/Middleware/HandleMessageInNewTransactionMiddleware.php

Lines changed: 70 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -12,79 +12,114 @@
1212
namespace Symfony\Component\Messenger\Middleware;
1313

1414
use Symfony\Component\Messenger\Envelope;
15-
use Symfony\Component\Messenger\EnvelopeAwareInterface;
16-
use Symfony\Component\Messenger\Exception\MessageHandlingException;
17-
use Symfony\Component\Messenger\Middleware\Configuration\Transaction;
15+
use Symfony\Component\Messenger\Exception\QueuedMessageHandlingException;
16+
use Symfony\Component\Messenger\Stamp\Transaction;
1817

1918
/**
20-
* Allow to configure messages to be handled in a new Doctrine transaction if using
21-
* the DoctrineTransactionMiddleware. This middleware should be used before DoctrineTransactionMiddleware.
19+
* Allow to configure messages to be handled in a new transaction.
20+
* I.e, messages dispatched from a handler with a Transaction stamp will actually be handled
21+
* once the current message being dispatched is fully handled or sent.
22+
*
23+
* For instance, using this middleware before the DoctrineTransactionMiddleware
24+
* means sub-dispatched messages with a Transaction item would be handled after
25+
* the Doctrine transaction has been committed.
2226
*
2327
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
2428
*/
25-
class HandleMessageInNewTransactionMiddleware implements MiddlewareInterface, EnvelopeAwareInterface
29+
class HandleMessageInNewTransactionMiddleware implements MiddlewareInterface
2630
{
2731
/**
28-
* @var array A queue of messages and callables
32+
* @var QueuedEnvelope[] A queue of messages and next middleware
2933
*/
3034
private $queue = array();
3135

3236
/**
33-
* @var bool Indicate if we are running the middleware or not. Ie, are we called inside a message handler?
37+
* @var bool Indicates if we are running the middleware or not. I.e, are we called during a dispatch?
3438
*/
35-
private $insideMessageHandler = false;
39+
private $isRunning = false;
3640

37-
/**
38-
* @param Envelope $envelope
39-
*/
40-
public function handle($envelope, callable $next)
41+
public function handle(Envelope $envelope, StackInterface $stack): Envelope
4142
{
42-
if (null !== $envelope->get(Transaction::class)) {
43-
if (!$this->insideMessageHandler) {
44-
throw new \LogicException('We have to use the transaction in the context of a message handler');
43+
if (null !== $envelope->last(Transaction::class)) {
44+
if (!$this->isRunning) {
45+
throw new \LogicException(sprintf('You can only use a "%s" stamp to define a new transaction in the context of a message handling.', Transaction::class));
4546
}
46-
$this->queue[] = array('envelope' => $envelope, 'callable' => $next);
47+
$this->queue[] = new QueuedEnvelope($envelope, $stack->next());
4748

48-
return;
49+
return $envelope;
4950
}
5051

51-
if ($this->insideMessageHandler) {
52+
if ($this->isRunning) {
5253
/*
53-
* If come inside a second message handler, just continue as normal. We should not
54-
* run the stored messages.
54+
* If come inside a second dispatch, just continue as normal.
55+
* We should not run the stored messages until first call is finished.
5556
*/
56-
return $next($envelope);
57+
return $stack->next()->handle($envelope, $stack);
5758
}
5859

59-
$this->insideMessageHandler = true;
60+
// First time we get here, mark as inside a root dispatch call:
61+
$this->isRunning = true;
6062
try {
61-
$returnData = $next($envelope);
63+
// Execute the whole middleware stack & message h F987 andling for main dispatch:
64+
$returnedEnvelope = $stack->next()->handle($envelope, $stack);
6265
} catch (\Throwable $exception) {
66+
/*
67+
* Whenever an exception occurs while handling a message that has
68+
* queued other messages, we drop the queued ones.
69+
* This is intentional since the queued commands were likely dependent
70+
* on the preceding command.
71+
*/
6372
$this->queue = array();
64-
$this->insideMessageHandler = false;
73+
$this->isRunning = false;
6574

6675
throw $exception;
6776
}
6877

78+
// Root dispatch call is finished, dispatch stored ones for real:
6979
$exceptions = array();
70-
while (!empty($queueItem = array_pop($this->queue))) {
80+
while (null !== $queueItem = array_shift($this->queue)) {
7181
try {
7282
// Execute the stored messages
73-
$queueItem['callable']($queueItem['envelope']);
83+
$queueItem->getNext()->handle($queueItem->getEnvelope(), $stack);
7484
} catch (\Throwable $exception) {
85+
// Gather all exceptions
7586
$exceptions[] = $exception;
7687
}
7788
}
7889

79-
// Assert: $this->queue is empty.
80-
$this->insideMessageHandler = false;
81-
if (!empty($exceptions)) {
82-
if (1 === \count($exceptions)) {
83-
throw $exceptions[0];
84-
}
85-
throw new MessageHandlingException($exceptions);
90+
$this->isRunning = false;
91+
if (\count($exceptions) > 0) {
92+
throw new QueuedMessageHandlingException($exceptions);
8693
}
8794

88-
return $returnData;
95+
return $returnedEnvelope;
96+
}
97+
}
98+
99+
/**
100+
* @internal
101+
*/
102+
final class QueuedEnvelope
103+
{
104+
/** @var Envelope */
105+
private $envelope;
106+
107+
/** @var MiddlewareInterface */
108+
private $next;
109+
110+
public function __construct(Envelope $envelope, MiddlewareInterface $next)
111+
{
112+
$this->envelope = $envelope;
113+
$this->next = $next;
114+
}
115+
116+
public function getEnvelope(): Envelope
117+
{
118+
return $this->envelope;
119+
}
120+
121+
public function getNext(): MiddlewareInterface
122+
{
123+
return $this->next;
89124
}
90125
}

src/Symfony/Component/Messenger/Middleware/Configuration/Transaction.php renamed to src/Symfony/Component/Messenger/Stamp/Transaction.php

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,15 @@
1111

1212
declare(strict_types=1);
1313

14-
namespace Symfony\Component\Messenger\Middleware\Configuration;
15-
16-
use Symfony\Component\Messenger\EnvelopeItemInterface;
14+
namespace Symfony\Component\Messenger\Stamp;
1715

1816
/**
19-
* Marker item to tell this message should be handled in a different Doctrine transaction.
20-
* This should be used together with HandleMessageInNewTransactionMiddleware and DoctrineTransactionMiddleware.
17+
* Marker item to tell this message should be handled in a different transaction.
18+
*
19+
* @see \Symfony\Component\Messenger\Middleware\HandleMessageInNewTransactionMiddleware
2120
*
2221
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
2322
*/
24-
class Transaction implements EnvelopeItemInterface
23+
class Transaction implements StampInterface
2524
{
2625
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Middleware;
13+
14+
use PHPUnit\Framework\MockObject\MockObject;
15+
use PHPUnit\Framework\TestCase;
16+
use Symfony\Component\Messenger\Envelope;
17+
use Symfony\Component\Messenger\Exception\QueuedMessageHandlingException;
18+
use Symfony\Component\Messenger\MessageBus;
19+
use Symfony\Component\Messenger\MessageBusInterface;
20+
use Symfony\Component\Messenger\Middleware\HandleMessageInNewTransactionMiddleware;
21+
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
22+
use Symfony\Component\Messenger\Middleware\StackInterface;
23+
use Symfony\Component\Messenger\Stamp\Transaction;
24+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
25+
26+
class HandleMessageInNewTransactionMiddlewareTest extends TestCase
27+
{
28+
public function testEventsInNewTransactionAreHandledAfterMainMessage()
29+
{
30+
$message = new DummyMessage('Hello');
31+
32+
$firstEvent = new DummyEvent('First event');
33+
$secondEvent = new DummyEvent('Second event');
34+
$thirdEvent = new DummyEvent('Third event');
35+
36+
$transactionMiddleware = new HandleMessageInNewTransactionMiddleware();
37+
$handlingMiddleware = $this->createMock(MiddlewareInterface::class);
38+
39+
$eventBus = new MessageBus(array(
40+
$transactionMiddleware,
41+
$handlingMiddleware,
42+
));
43+
44+
$messageBus = new MessageBus(array(
45+
$transactionMiddleware,
46+
new DispatchingMiddleware($eventBus, array(
47+
new Envelope($firstEvent, new Transaction()),
48+
new Envelope($secondEvent, new Transaction()),
49+
$thirdEvent, // Not in a new transaction
50+
)),
51+
$handlingMiddleware,
52+
));
53+
54+
// Third event is dispatch within main dispatch, but before its handling:
55+
$this->expectHandledMessage($handlingMiddleware, 0, $thirdEvent);
56+
// Then expect main dispatched message to be handled first:
57+
$this->expectHandledMessage($handlingMiddleware, 1, $message);
58+
// Then, expect events in new transaction to be handled next, in dispatched order:
59+
$this->expectHandledMessage($handlingMiddleware, 2, $firstEvent);
60+
$this->expectHandledMessage($handlingMiddleware, 3, $secondEvent);
61+
62+
$messageBus->dispatch($message);
63+
}
64+
65+
public function testThrowingEventsHandlingWontStopExecution()
66+
{
67+
$message = new DummyMessage('Hello');
68+
69+
$firstEvent = new DummyEvent('First event');
70+
$secondEvent = new DummyEvent('Second event');
71+
72+
$transactionMiddleware = new HandleMessageInNewTransactionMiddleware();
73+
$handlingMiddleware = $this->createMock(MiddlewareInterface::class);
74+
75+
$eventBus = new MessageBus(array(
76+
$transactionMiddleware,
77+
$handlingMiddleware,
78+
));
79+
80+
$messageBus = new MessageBus(array(
81+
$transactionMiddleware,
82+
new DispatchingMiddleware($eventBus, array(
83+
new Envelope($firstEvent, new Transaction()),
84+
new Envelope($secondEvent, new Transaction()),
85+
)),
86+
$handlingMiddleware,
87+
));
88+
89+
// Expect main dispatched message to be handled first:
90+
$this->expectHandledMessage($handlingMiddleware, 0, $message);
91+
// Then, expect events in new transaction to be handled next, in dispatched order:
92+
$this->expectThrowingHandling($handlingMiddleware, 1, $firstEvent, new \RuntimeException('Some exception while handling first event'));
93+
// Next event is still handled despite the previous exception:
94+
$this->expectHandledMessage($handlingMiddleware, 2, $secondEvent);
95+
96+
$this->expectException(QueuedMessageHandlingException::class);
97+
$this->expectExceptionMessage('RuntimeException: Some exception while handling first event');
98+
99+
$messageBus->dispatch($message);
100+
}
101+
102+
/**
103+
* @param MiddlewareInterface|MockObject $handlingMiddleware
104+
*/
105+
private function expectHandledMessage(MiddlewareInterface $handlingMiddleware, int $at, $message): void
106+
{
107+
$handlingMiddleware->expects($this->at($at))->method('handle')->with($this->callback(function (Envelope $envelope) use ($message) {
108+
return $envelope->getMessage() === $message;
109+
}))->willReturnCallback(function($envelope, StackInterface $stack) {
110+
return $stack->next()->handle($envelope, $stack);
111+
});
112+
}
113+
114+
/**
115+
* @param MiddlewareInterface|MockObject $handlingMiddleware
116+
*/
117+
private function expectThrowingHandling(MiddlewareInterface $handlingMiddleware, int $at, $message, \Throwable $throwable): void
118+
{
119+
$handlingMiddleware->expects($this->at($at))->method('handle')->with($this->callback(function (Envelope $envelope) use ($message) {
120+
return $envelope->getMessage() === $message;
121+
}))->willReturnCallback(function() use ($throwable) {
122+
throw $throwable;
123+
});
124+
}
125+
}
126+
127+
class DummyEvent
128+
{
129+
private $message;
130+
131+
public function __construct(string $message)
132+
{
133+
$this->message = $message;
134+
}
135+
136+
public function getMessage(): string
137+
{
138+
return $this->message;
139+
}
140+
}
141+
142+
class DispatchingMiddleware implements MiddlewareInterface
143+
{
144+
private $bus;
145+
private $messages;
146+
147+
public function __construct(MessageBusInterface $bus, array $messages)
148+
{
149+
$this->bus = $bus;
150+
$this->messages = $messages;
151+
}
152+
153+
public function handle(Envelope $envelope, StackInterface $stack): Envelope
154+
{
155+
foreach ($this->messages as $event) {
156+
$this->bus->dispatch($event);
157+
}
158+
159+
return $stack->next()->handle($envelope, $stack);
160+
}
161+
}

0 commit comments

Comments
 (0)
0