8000 Continuing "Support for handling messages in different transaction" · symfony/symfony@fc3dc64 · GitHub
[go: up one dir, main page]

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit fc3dc64

Browse files
committed
Continuing "Support for handling messages in different transaction"
1 parent a82ad8a commit fc3dc64

File tree

5 files changed

+245
-49
lines changed

5 files changed

+245
-49
lines changed

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
<argument /> <!-- Bus handler resolver -->
2929
</service>
3030

31-
<service id="messenger.middleware.handle_message_in_new_transaction" class="Symfony\Component\Messenger\Middleware\HandleMessageInNewTransactionMiddleware" abstract="true" />
31+
<service id="messenger.middleware.handle_message_in_new_transaction" class="Symfony\Component\Messenger\Middleware\HandleMessageInNewTransactionMiddleware" />
3232

3333
<service id="messenger.middleware.validation" class="Symfony\Component\Messenger\Middleware\ValidationMiddleware">
3434
<argument type="service" id="validator" />

src/Symfony/Component/Messenger/Exception/MessageHandlingException.php renamed to src/Symfony/Component/Messenger/Exception/QueuedMessageHandlingException.php

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,26 +12,27 @@
1212
namespace Symfony\Component\Messenger\Exception;
1313

1414
/**
15-
* When handling messages, some handlers caused an exception. This exception
16-
* contains all those handler exceptions.
15+
* When handling queued messages from {@link HandleMessageInNewTransactionMiddleware},
16+
* some handlers caused an exception. This exception contains all those handler exceptions.
1717
*
1818
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
1919
*/
20-
class MessageHandlingException extends \RuntimeException implements ExceptionInterface
20+
class QueuedMessageHandlingException extends \RuntimeException implements ExceptionInterface
2121
{
22-
private $exceptions = array();
22+
private $exceptions;
2323

2424
public function __construct(array $exceptions)
2525
{
2626
$message = sprintf(
27-
"Some handlers for recorded messages threw an exception. Their messages were: \n\n%s",
27+
"Some handlers for queued messages threw an exception: \n\n%s",
2828
implode(", \n", array_map(function (\Throwable $e) {
29-
return $e->getMessage();
29+
return \get_class($e).': '.$e->getMessage();
3030
}, $exceptions))
3131
);
3232

3333
$this->exceptions = $exceptions;
34-
parent::__construct($message);
34+
35+
parent::__construct($message, 0, 1 === \count($exceptions) ? $exceptions[0] : null);
3536
}
3637

3738
public function getExceptions(): array

src/Symfony/Component/Messenger/Middleware/HandleMessageInNewTransactionMiddleware.php

Lines changed: 70 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -12,79 +12,114 @@
1212
namespace Symfony\Component\Messenger\Middleware;
1313

1414
use Symfony\Component\Messenger\Envelope;
15-
use Symfony\Component\Messenger\EnvelopeAwareInterface;
16-
use Symfony\Component\Messenger\Exception\MessageHandlingException;
17-
use Symfony\Component\Messenger\Middleware\Configuration\Transaction;
15+
use Symfony\Component\Messenger\Exception\QueuedMessageHandlingException;
16+
use Symfony\Component\Messenger\Stamp\Transaction;
1817

1918
/**
20-
* Allow to configure messages to be handled in a new Doctrine transaction if using
21-
* the DoctrineTransactionMiddleware. This middleware should be used before DoctrineTransactionMiddleware.
19+
* Allow to configure messages to be handled in a new transaction.
20+
* I.e, messages dispatched from a handler with a Transaction stamp will actually be handled
21+
* once the current message being dispatched is fully handled or sent.
22+
*
23+
* For instance, using this middleware before the DoctrineTransactionMiddleware
24+
* means sub-dispatched messages with a Transaction item would be handled after
25+
* the Doctrine transaction has been committed.
2226
*
2327
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
2428
*/
25-
class HandleMessageInNewTransactionMiddleware implements MiddlewareInterface, EnvelopeAwareInterface
29+
class HandleMessageInNewTransactionMiddleware implements MiddlewareInterface
2630
{
2731
/**
28-
* @var array A queue of messages and callables
32+
* @var QueuedEnvelope[] A queue of messages and next middleware
2933
*/
3034
private $queue = array();
3135

3236
/**
33-
* @var bool Indicate if we are running the middleware or not. Ie, are we called inside a message handler?
37+
* @var bool Indicates if we are running the middleware or not. I.e, are we called during a dispatch?
3438
*/
35-
private $insideMessageHandler = false;
39+
private $isRunning = false;
3640

37-
/**
38-
* @param Envelope $envelope
39-
*/
40-
public function handle($envelope, callable $next)
41+
public function handle(Envelope $envelope, StackInterface $stack): Envelope
4142
{
42-
if (null !== $envelope->get(Transaction::class)) {
43-
if (!$this->insideMessageHandler) {
44-
throw new \LogicException('We have to use the transaction in the context of a message handler');
43+
if (null !== $envelope->last(Transaction::class)) {
44+
if (!$this->isRunning) {
45+
throw new \LogicException(sprintf('You can only use a "%s" stamp to define a new transaction in the context of a message handling.', Transaction::class));
4546
}
46-
$this->queue[] = array('envelope' => $envelope, 'callable' => $next);
47+
$this->queue[] = new QueuedEnvelope($envelope, $stack->next());
4748

48-
return;
49+
return $envelope;
4950
}
5051

51-
if ($this->insideMessageHandler) {
52+
if ($this->isRunning) {
5253
/*
53-
* If come inside a second message handler, just continue as normal. We should not
54-
* run the stored messages.
54+
* If come inside a second dispatch, just continue as normal.
55+
* We should not run the stored messages until first call is finished.
5556
*/
56-
return $next($envelope);
57+
return $stack->next()->handle($envelope, $stack);
5758
}
5859

59-
$this->insideMessageHandler = true;
60+
// First time we get here, mark as inside a root dispatch call:
61+
$this->isRunning = true;
6062
try {
61-
$returnData = $next($envelope);
63+
// Execute the whole middleware stack & message handling for main dispatch:
64+
$returnedEnvelope = $stack->next()->handle($envelope, $stack);
6265
} catch (\Throwable $exception) {
66+
/*
67+
* Whenever an exception occurs while handling a message that has
68+
* queued other messages, we drop the queued ones.
69+
* This is intentional since the queued commands were likely dependent
70+
* on the preceding command.
71+
*/
6372
$this->queue = array();
64-
$this->insideMessageHandler = false;
73+
$this->isRunning = false;
6574

6675
throw $exception;
6776
}
6877

78+
// Root dispatch call is finished, dispatch stored ones for real:
6979
$exceptions = array();
70-
while (!empty($queueItem = array_pop($this->queue))) {
80+
while (null !== $queueItem = array_shift($this->queue)) {
7181
try {
7282
// Execute the stored messages
73-
$queueItem['callable']($queueItem['envelope']);
83+
$queueItem->getNext()->handle($queueItem->getEnvelope(), $stack);
7484
} catch (\Throwable $exception) {
85+
// Gather all exceptions
7586
$exceptions[] = $exception;
7687
}
7788
}
7889

79-
// Assert: $this->queue is empty.
80-
$this->insideMessageHandler = false;
81-
if (!empty($exceptions)) {
82-
if (1 === \count($exceptions)) {
83-
throw $exceptions[0];
84-
}
85-
throw new MessageHandlingException($exceptions);
90+
$this->isRunning = false;
91+
if (\count($exceptions) > 0) {
92+
throw new QueuedMessageHandlingException($exceptions);
8693
}
8794

88-
return $returnData;
95+
return $returnedEnvelope;
96+
}
97+
}
98+
99+
/**
100+
* @internal
101+
*/
102+
final class QueuedEnvelope
103+
{
104+
/** @var Envelope */
105+
private $envelope;
106+
107+
/** @var MiddlewareInterface */
108+
private $next;
109+
110+
public function __construct(Envelope $envelope, MiddlewareInterface $next)
111+
{
112+
$this->envelope = $envelope;
113+
$this->next = $next;
114+
}
115+
116+
public function getEnvelope(): Envelope
117+
{
118+
return $this->envelope;
119+
}
120+
121+
public function getNext(): MiddlewareInterface
122+
{
123+
return $this->next;
89124
}
90125
}

src/Symfony/Component/Messenger/Middleware/Configuration/Transaction.php renamed to src/Symfony/Component/Messenger/Stamp/Transaction.php

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,15 @@
1111

1212
declare(strict_types=1);
1313

14-
namespace Symfony\Component\Messenger\Middleware\Configuration;
15-
16-
use Symfony\Component\Messenger\EnvelopeItemInterface;
14+
namespace Symfony\Component\Messenger\Stamp;
1715

1816
/**
19-
* Marker item to tell this message should be handled in a different Doctrine transaction.
20-
* This should be used together with HandleMessageInNewTransactionMiddleware and DoctrineTransactionMiddleware.
17+
* Marker item to tell this message should be handled in a different transaction.
18+
*
19+
* @see \Symfony\Component\Messenger\Middleware\HandleMessageInNewTransactionMiddleware
2120
*
2221
* @author Tobias Nyholm <tobias.nyholm@gmail.com>
2322
*/
24-
class Transaction implements EnvelopeItemInterface
23+
class Transaction implements StampInterface
2524
{
2625
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Middleware;
13+
14+
use PHPUnit\Framework\MockObject\MockObject;
15+
use PHPUnit\Framework\TestCase;
16+
use Symfony\Component\Messenger\Envelope;
17+
use Symfony\Component\Messenger\Exception\QueuedMessageHandlingException;
18+
use Symfony\Component\Messenger\MessageBus;
19+
use Symfony\Component\Messenger\MessageBusInterface;
20+
use Symfony\Component\Messenger\Middleware\HandleMessageInNewTransactionMiddleware;
21+
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
22+
use Symfony\Component\Messenger\Middleware\StackInterface;
23+
use Symfony\Component\Messenger\Stamp\Transaction;
24+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
25+
26+
class HandleMessageInNewTransactionMiddlewareTest extends TestCase
27+
{
28+
public function testEventsInNewTransactionAreHandledAfterMainMessage()
29+
{
30+
$message = new DummyMessage('Hello');
31+
32+
$firstEvent = new DummyEvent('First event');
33+
$secondEvent = new DummyEvent('Second event');
34+
$thirdEvent = new DummyEvent('Third event');
35+
36+
$transactionMiddleware = new HandleMessageInNewTransactionMiddleware();
37+
$handlingMiddleware = $this->createMock(MiddlewareInterface::class);
38+
39+
$eventBus = new MessageBus(array(
40+
$transactionMiddleware,
41+
$handlingMiddleware,
42+
));
43+
44+
$messageBus = new MessageBus(array(
45+
$transactionMiddleware,
46+
new DispatchingMiddleware($eventBus, array(
47+
new Envelope($firstEvent, new Transaction()),
48+
new Envelope($secondEvent, new Transaction()),
49+
$thirdEvent, // Not in a new transaction
50+
)),
51+
$handlingMiddleware,
52+
));
53+
54+
// Third event is dispatch within main dispatch, but before its handling:
55+
$this->expectHandledMessage($handlingMiddleware, 0, $thirdEvent);
56+
// Then expect main dispatched message to be handled first:
57+
$this->expectHandledMessage($handlingMiddleware, 1, $message);
58+
// Then, expect events in new transaction to be handled next, in dispatched order:
59+
$this->expectHandledMessage($handlingMiddleware, 2, $firstEvent);
60+
$this->expectHandledMessage($handlingMiddleware, 3, $secondEvent);
61+
62+
$messageBus->dispatch($message);
63+
}
64+
65+
public function testThrowingEventsHandlingWontStopExecution()
66+
{
67+
$message = new DummyMessage('Hello');
68+
69+
$firstEvent = new DummyEvent('First event');
70+
$secondEvent = new DummyEvent('Second event');
71+
72+
$transactionMiddleware = new HandleMessageInNewTransactionMiddleware();
73+
$handlingMiddleware = $this->createMock(MiddlewareInterface::class);
74+
75+
$eventBus = new MessageBus(array(
76+
$transactionMiddleware,
77+
$handlingMiddleware,
78+
));
79+
80+
$messageBus = new MessageBus(array(
81+
$transactionMiddleware,
82+
new DispatchingMiddleware($eventBus, array(
83+
new Envelope($firstEvent, new Transaction()),
84+
new Envelope($secondEvent, new Transaction()),
85+
)),
86+
$handlingMiddleware,
87+
));
88+
89+
// Expect main dispatched message to be handled first:
90+
$this->expectHandledMessage($handlingMiddleware, 0, $message);
91+
// Then, expect events in new transaction to be handled next, in dispatched order:
92+
$this->expectThrowingHandling($handlingMiddleware, 1, $firstEvent, new \RuntimeException('Some exception while handling first event'));
93+
// Next event is still handled despite the previous exception:
94+
$this->expectHandledMessage($handlingMiddleware, 2, $secondEvent);
95+
96+
$this->expectException(QueuedMessageHandlingException::class);
97+
$this->expectExceptionMessage('RuntimeException: Some exception while handling first event');
98+
99+
$messageBus->dispatch($message);
100+
}
101+
102+
/**
103+
* @param MiddlewareInterface|MockObject $handlingMiddleware
104+
*/
105+
private function expectHandledMessage(MiddlewareInterface $handlingMiddleware, int $at, $message): void
106+
{
107+
$handlingMiddleware->expects($this->at($at))->method('handle')->with($this->callback(function (Envelope $envelope) use ($message) {
108+
return $envelope->getMessage() === $message;
109+
}))->willReturnCallback(function($envelope, StackInterface $stack) {
110+
return $stack->next()->handle($envelope, $stack);
111+
});
112+
}
113+
114+
/**
115+
* @param MiddlewareInterface|MockObject $handlingMiddleware
116+
*/
117+
private function expectThrowingHandling(MiddlewareInterface $handlingMiddleware, int $at, $message, \Throwable $throwable): void
118+
{
119+
$handlingMiddleware->expects($this->at($at))->method('handle')->with($this->callback(function (Envelope $envelope) use ($message) {
120+
return $envelope->getMessage() === $message;
121+
}))->willReturnCallback(function() use ($throwable) {
122+
throw $throwable;
123+
});
124+
}
125+
}
126+
127+
class DummyEvent
128+
{
129+
private $message;
130+
131+
public function __construct(string $message)
132+
{
133+
$this->message = $message;
134+
}
135+
136+
public function getMessage(): string
137+
{
138+
return $this->message;
139+
}
140+
}
141+
142+
class DispatchingMiddleware implements MiddlewareInterface
143+
{
144+
private $bus;
145+
private $messages;
146+
147+
public function __construct(MessageBusInterface $bus, array $messages)
148+
{
149+
$this->bus = $bus;
150+
$this->messages = $messages;
151+
}
152+
153+
public function handle(Envelope $envelope, StackInterface $stack): Envelope
154+
{
155+
foreach ($this->messages as $event) {
156+
$this->bus->dispatch($event);
157+
}
158+
159+
return $stack->next()->handle($envelope, $stack);
160+
}
161+
}

0 commit comments

Comments
 (0)
0