8000 Adding the "sync" transport to call handlers synchronously · symfony/symfony@3da5a43 · GitHub
[go: up one dir, main page]

Skip to content

Commit 3da5a43

Browse files
committed
Adding the "sync" transport to call handlers synchronously
1 parent 162d5a8 commit 3da5a43

File tree

7 files changed

+144
-2
lines changed

7 files changed

+144
-2
lines changed

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@
6767
<argument type="service" id="messenger.transport.serializer" />
6868
</service>
6969

70+
<service id="messenger.transport.sync.factory" class="Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory">
71+
<tag name="messenger.transport_factory" />
72+
</service>
73+
7074
<!-- retry -->
7175
<service id="messenger.retry_strategy_locator">
7276
<tag name="container.service_locator" />

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ CHANGELOG
44
4.3.0
55
-----
66

7+
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
8+
explicitly handle messages asynchronously.
79
* Added optional parameter `prefetch_count` in connection configuration,
810
to setup channel prefetch count
911
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`

src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Psr\Log\NullLogger;
1616
use Symfony\Component\Messenger\Envelope;
1717
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
18+
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
1819
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
1920
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
2021
use Symfony\Component\Messenger\Stamp\SentStamp;
@@ -81,7 +82,13 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
8182
$envelope = $sender->send($envelope->with(new SentStamp(\get_class($sender), \is_string($alias) ? $alias : null)));
8283
}
8384

84-
// on a redelivery, never call local handlers
85+
// if the message was marked (usually by SyncTransport) that it handlers
86+
// MUST be called, mark them to be handled.
87+
if (null !== $envelope->last(ForceCallHandlersStamp::class)) {
88+
$handle = true;
89+
}
90+
91+
// on a redelivery, only send back to queue: never call local handlers
8592
if (null !== $redeliveryStamp) {
8693
$handle = false;
8794
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Stamp;
13+
14+
/**
15+
* Stamp marks that the handlers *should* be called immediately.
16+
*
17+
* This is used by the SyncTransport to indicate to the
18+
* SendMessageMiddleware that handlers *should* be called
19+
* immediately, even though a transport was set.
20+
*
21+
* @experimental in 4.3
22+
*
23+
* @author Ryan Weaver <ryan@symfonycasts.com>
24+
*/
25+
class ForceCallHandlersStamp implements StampInterface
26+
{
27+
}

src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
1616
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
17+
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
1718
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
1819
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
1920
use Symfony\Component\Messenger\Stamp\SentStamp;
@@ -86,6 +87,8 @@ public function testItSendsTheMessageToMultipleSenders()
8687
public function testItSendsToOnlyOneSenderOnRedelivery()
8788
{
8889
$envelope = new Envelope(new DummyMessage('Hey'), new RedeliveryStamp(5, 'bar'));
90+
// even with a ForceCallHandlersStamp, the next middleware won't be called
91+
$envelope = $envelope->with(new ForceCallHandlersStamp());
8992
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
9093
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
9194

@@ -237,7 +240,7 @@ public function testItDoesNotDispatchWithNoSenders()
237240
$middleware->handle($envelope, $this->getStackMock());
238241
}
239242

240-
public function testItDoesNotDispatchOnRetry()
243+
public function testItDoesNotDispatchOnRedeliver()
241244
{
242245
$envelope = new Envelope(new DummyMessage('original envelope'));
243246
$envelope = $envelope->with(new RedeliveryStamp(3, 'foo_sender'));
@@ -251,4 +254,18 @@ public function testItDoesNotDispatchOnRetry()
251254

252255
$middleware->handle($envelope, $this->getStackMock(false));
253256
}
257+
258+
public function testItHandlesWithForceCallHandlersStamp()
259+
{
260+
$envelope = new Envelope(new DummyMessage('original envelope'));
261+
$envelope = $envelope->with(new ForceCallHandlersStamp());
262+
263+
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
264+
$sender->expects($this->once())->method('send')->willReturn($envelope);
265+
266+
$middleware = new SendMessageMiddleware(new SendersLocator([DummyMessage::class => [$sender]]));
267+
268+
// next handler *should* be called
269+
$middleware->handle($envelope, $this->getStackMock(true));
270+
}
254271
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport\Sync;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
16+
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
17+
use Symfony\Component\Messenger\Transport\TransportInterface;
18+
19+
/**
20+
* A "fake" transport that marks messages to be handled immediately.
21+
*
22+
* @experimental in 4.3
23+
*
24+
* @author Ryan Weaver <ryan@symfonycasts.com>
25+
*/
26+
class SyncTransport implements TransportInterface
27+
{
28+
public function receive(callable $handler): void
29+
{
30+
throw new InvalidArgumentException('You cannot receive messages from the SyncTransport.');
31+
}
32+
33+
public function stop(): void
34+
{
35+
throw new InvalidArgumentException('You cannot call stop() on the SyncTransport.');
36+
}
37+
38+
public function ack(Envelope $envelope): void
39+
{
40+
throw new InvalidArgumentException('You cannot call ack() on the SyncTransport.');
41+
}
42+
43+
public function reject(Envelope $envelope): void
44+
{
45+
throw new InvalidArgumentException('You cannot call reject() on the SyncTransport.');
46+
}
47+
48+
public function send(Envelope $envelope): Envelope
49+
{
50+
return $envelope->with(new ForceCallHandlersStamp());
51+
}
52+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport\Sync;
13+
14+
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
15+
use Symfony\Component\Messenger\Transport\TransportInterface;
16+
17+
/**
18+
* @experimental in 4.3
19+
*
20+
* @author Ryan Weaver <ryan@symfonycasts.com>
21+
*/
22+
class SyncTransportFactory implements TransportFactoryInterface
23+
{
24+
public function createTransport(string $dsn, array $options): TransportInterface
25+
{
26+
return new SyncTransport();
27+
}
28+
29+
public function supports(string $dsn, array $options): bool
30+
{
31+
return 0 === strpos($dsn, 'sync://');
32+
}
33+
}

0 commit comments

Comments
 (0)
0