8000 [Messenger] Support configuring messages when dispatching · symfony/symfony@749054a · GitHub
[go: up one dir, main page]

Skip to content

Commit 749054a

Browse files
ogizanagisroze
authored andcommitted
[Messenger] Support configuring messages when dispatching
1 parent 47da23c commit 749054a

36 files changed

+648
-125
lines changed

src/Symfony/Component/Messenger/Asynchronous/Middleware/SendMessageMiddleware.php

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@
1313

1414
use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocatorInterface;
1515
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
16+
use Symfony\Component\Messenger\Envelope;
17+
use Symfony\Component\Messenger\EnvelopeAwareInterface;
1618
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
1719

1820
/**
1921
* @author Samuel Roze <samuel.roze@gmail.com>
2022
*/
21-
class SendMessageMiddleware implements MiddlewareInterface
23+
class SendMessageMiddleware implements MiddlewareInterface, EnvelopeAwareInterface
2224
{
2325
private $senderLocator;
2426

@@ -32,17 +34,19 @@ public function __construct(SenderLocatorInterface $senderLocator)
3234
*/
3335
public function handle($message, callable $next)
3436
{
35-
if ($message instanceof ReceivedMessage) {
36-
return $next($message->getMessage());
37+
$envelope = Envelope::wrap($message);
38+
if ($envelope->get(ReceivedMessage::class)) {
39+
// It's a received message. Do not send it back:
40+
return $next($message);
3741
}
3842

39-
if (!empty($senders = $this->senderLocator->getSendersForMessage($message))) {
43+
if (!empty($senders = $this->senderLocator->getSendersForMessage($envelope->getMessage()))) {
4044
foreach ($senders as $sender) {
4145
if (null === $sender) {
4246
continue;
4347
}
4448

45-
$sender->send($message);
49+
$sender->send($envelope);
4650
}
4751

4852
if (!\in_array(null, $senders, true)) {

src/Symfony/Component/Messenger/Asynchronous/Transport/ReceivedMessage.php

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,26 +12,26 @@
1212
namespace Symfony\Component\Messenger\Asynchronous\Transport;
1313

1414
use Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware;
15+
use Symfony\Component\Messenger\EnvelopeItemInterface;
1516

1617
/**
17-
* Wraps a received message. This is mainly used by the `SendMessageMiddleware` middleware to identify
18+
* Marker config for a received message.
19+
* This is mainly used by the `SendMessageMiddleware` middleware to identify
1820
* a message should not be sent if it was just received.
1921
*
2022
* @see SendMessageMiddleware
2123
*
2224
* @author Samuel Roze <samuel.roze@gmail.com>
2325
*/
24-
final class ReceivedMessage
26+
final class ReceivedMessage implements EnvelopeItemInterface
2527
{
26-
private $message;
27-
28-
public function __construct($message)
28+
public function serialize()
2929
{
30-
$this->message = $message;
30+
return '';
3131
}
3232

33-
public function getMessage()
33+
public function unserialize($serialized)
3434
{
35-
return $this->message;
35+
// noop
3636
}
3737
}

src/Symfony/Component/Messenger/Asynchronous/Transport/WrapIntoReceivedMessage.php

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\Messenger\Asynchronous\Transport;
1313

14+
use Symfony\Component\Messenger\Envelope;
1415
use Symfony\Component\Messenger\Transport\ReceiverInterface;
1516

1617
/**
@@ -27,12 +28,12 @@ public function __construct(ReceiverInterface $decoratedConsumer)
2728

2829
public function receive(callable $handler): void
2930
{
30-
$this->decoratedReceiver->receive(function ($message) use ($handler) {
31-
if (null !== $message) {
32-
$message = new ReceivedMessage($message);
31+
$this->decoratedReceiver->receive(function (?Envelope $envelope) use ($handler) {
32+
if (null !== $envelope) {
33+
$envelope = $envelope->with(new ReceivedMessage());
3334
}
3435

35-
$handler($message);
36+
$handler($envelope);
3637
});
3738
}
3839

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger;
13+
14+
/**
15+
* A message wrapped in an envelope with items (configurations, markers, ...).
16+
*
17+
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
18+
*
19+
* @experimental in 4.1
20+
*/
21+
final class Envelope
22+
{
23+
private $items = array();
24+
private $message;
25+
26+
/**
27+
* @param object $message
28+
* @param EnvelopeItemInterface[] $items
29+
*/
30+
public function __construct($message, array $items = array())
31+
{
32+
$this->message = $message;
33+
foreach ($items as $item) {
34+
$this->items[\get_class($item)] = $item;
35+
}
36+
}
37+
38+
/**
39+
* Wrap a message into an envelope if not already wrapped.
40+
*
41+
* @param Envelope|object $message
42+
*/
43+
public static function wrap($message): self
44+
{
45+
return $message instanceof self ? $message : new self($message);
46+
}
47+
48+
/**
49+
* @return Envelope a new Envelope instance with additional item
50+
*/
51+
public function with(EnvelopeItemInterface $item): self
52+
{
53+
$cloned = clone $this;
54+
55+
$cloned->items[\get_class($item)] = $item;
56+
57+
return $cloned;
58+
}
59+
60+
public function get(string $itemFqcn): ?EnvelopeItemInterface
61+
{
62+
return $this->items[$itemFqcn] ?? null;
63+
}
64+
65+
/**
66+
* @return EnvelopeItemInterface[] indexed by fqcn
67+
*/
68+
public function all(): array
69+
{
70+
return $this->items;
71+
}
72+
73+
/**
74+
* @return object The original message contained in the envelope
75+
*/
76+
public function getMessage()
77+
{
78+
return $this->message;
79+
}
80+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger;
13+
14+
/**
15+
* A Messenger protagonist aware of the message envelope and its content.
16+
*
17+
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
18+
*
19+
* @experimental in 4.1
20+
*/
21+
interface EnvelopeAwareInterface
22+
{
23+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger;
13+
14+
/**
15+
* An envelope item related to a message.
16+
* This item must be serializable for transport.
17+
*
18+
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
19+
*
20+
* @experimental in 4.1
21+
*/
22+
interface EnvelopeItemInterface extends \Serializable
23+
{
24+
}

src/Symfony/Component/Messenger/MessageBus.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ private function callableForNextMiddleware(int $index): callable
5555
$middleware = $this->indexedMiddlewares[$index];
5656

5757
return function ($message) use ($middleware, $index) {
58+
$message = Envelope::wrap($message);
59+
if (!$middleware instanceof EnvelopeAwareInterface) {
60+
// Do not provide the envelope if the middleware cannot read it:
61+
$message = $message->getMessage();
62+
}
63+
5864
return $middleware->handle($message, $this->callableForNextMiddleware($index + 1));
5965
};
6066
}

src/Symfony/Component/Messenger/MessageBusInterface.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ interface MessageBusInterface
2323
*
2424
* The bus can return a value coming from handlers, but is not required to do so.
2525
*
26-
* @param object $message
26+
* @param object|Envelope $message The message or the message pre-wrapped in an envelope
2727
*
2828
* @return mixed
2929
*/
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Middleware\Configuration;
13+
14+
use Symfony\Component\Messenger\EnvelopeItemInterface;
15+
use Symfony\Component\Validator\Constraints\GroupSequence;
16+
17+
/**
18+
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
19+
*
20+
* @experimental in 4.1
21+
*/
22+
final class ValidationConfiguration implements EnvelopeItemInterface
23+
{
24+
private $groups;
25+
26+
/**
27+
* @param string[]|GroupSequence $groups
28+
*/
29+
public function __construct($groups)
30+
{
31+
$this->groups = $groups;
32+
}
33+
34+
public function getGroups()
35+
{
36+
return $this->groups;
37+
}
38+
39+
public function serialize()
40+
{
41+
$isGroupSequence = $this->groups instanceof GroupSequence;
42+
43+
return serialize(array(
44+
'groups' => $isGroupSequence ? $this->groups->groups : $this->groups,
45+
'is_group_sequence' => $isGroupSequence,
46+
));
47+
}
48+
49+
public function unserialize($serialized)
50+
{
51+
list(
52+
'groups' => $groups,
53+
'is_group_sequence' => $isGroupSequence
54+
) = unserialize($serialized, array('allowed_classes' => false));
55+
56+
$this->__construct($isGroupSequence ? new GroupSequence($groups) : $groups);
57+
}
58+
}

src/Symfony/Component/Messenger/Middleware/LoggingMiddleware.php

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
namespace Symfony\Component\Messenger\Middleware;
1313

14-
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
1514
use Psr\Log\LoggerInterface;
1615

1716
/**
@@ -51,10 +50,6 @@ public function handle($message, callable $next)
5150

5251
private function createContext($message): array
5352
{
54-
if ($message instanceof ReceivedMessage) {
55-
$message = $message->getMessage();
56-
}
57-
5853
return array(
5954
'message' => $message,
6055
'class' => \get_class($message),

0 commit comments

Comments
 (0)
0