10000 Simplifying SyncTransport and fixing bug with handlers transport · symfony/symfony@ad655c0 · GitHub
[go: up one dir, main page]

Skip to content

Commit ad655c0

Browse files
committed
Simplifying SyncTransport and fixing bug with handlers transport
1 parent 72863e4 commit ad655c0

File tree

9 files changed

+46
-62
lines changed

9 files changed

+46
-62
lines changed

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373

7474
<service id="messenger.transport.sync.factory" class="Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory">
7575
<tag name="messenger.transport_factory" />
76+
<argument type="service" id="messenger.routable_message_bus" />
7677
</service>
7778

7879
<service id="messenger.transport.in_memory.factory" class="Symfony\Component\Messenger\Transport\InMemoryTransportFactory">

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@ CHANGELOG
2424
to stop all `messenger:consume` workers.
2525
* [BC BREAK] The `TransportFactoryInterface::createTransport()` signature
2626
changed: a required 3rd `SerializerInterface` argument was added.
27-
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
28-
explicitly handle messages synchronously.
27+
* Added a new `SyncTransport` to explicitly handle messages synchronously.
2928
* Added `AmqpStamp` allowing to provide a routing key, flags and attributes on message publishing.
3029
* [BC BREAK] Removed publishing with a `routing_key` option from queue configuration, for
3130
AMQP. Use exchange `default_publish_routing_key` or `AmqpStamp` instead.

src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
use Psr\Log\NullLogger;
1616
use Symfony\Component\Messenger\Envelope;
1717
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
18-
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
1918
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
2019
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
2120
use Symfony\Component\Messenger\Stamp\SentStamp;
@@ -78,12 +77,6 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
7877
$envelope = $sender->send($envelope->with(new SentStamp(\get_class($sender), \is_string($alias) ? $alias : null)));
7978
}
8079

81-
// if the message was marked (usually by SyncTransport) that it handlers
82-
// MUST be called, mark them to be handled.
83-
if (null !== $envelope->last(ForceCallHandlersStamp::class)) {
84-
$handle = true;
85-
}
86-
8780
// on a redelivery, only send back to queue: never call local handlers
8881
if (null !== $redeliveryStamp) {
8982
$handle = false;

src/Symfony/Component/Messenger/Stamp/ForceCallHandlersStamp.php

Lines changed: 0 additions & 27 deletions
This file was deleted.

src/Symfony/Component/Messenger/Tests/Middleware/SendMessageMiddlewareTest.php

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
use Symfony\Component\Messenger\Envelope;
1616
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
1717
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
18-
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
1918
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
2019
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
2120
use Symfony\Component\Messenger\Stamp\SentStamp;
@@ -88,8 +87,6 @@ public function testItSendsTheMessageToMultipleSenders()
8887
public function testItSendsToOnlyOneSenderOnRedelivery()
8988
{
9089
$envelope = new Envelope(new DummyMessage('Hey'), [new RedeliveryStamp(5, 'bar')]);
91-
// even with a ForceCallHandlersStamp, the next middleware won't be called
92-
$envelope = $envelope->with(new ForceCallHandlersStamp());
9390
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
9491
$sender2 = $this->getMockBuilder(SenderInterface::class)->getMock();
9592

@@ -270,21 +267,6 @@ public function testItDoesNotDispatchOnRedeliver()
270267
$middleware->handle($envelope, $this->getStackMock(false));
271268
}
272269

273-
public function testItHandlesWithForceCallHandlersStamp()
274-
{
275-
$envelope = new Envelope(new DummyMessage('original envelope'));
276-
$envelope = $envelope->with(new ForceCallHandlersStamp());
277-
278-
$sender = $this->getMockBuilder(SenderInterface::class)->getMock();
279-
$sender->expects($this->once())->method('send')->willReturn($envelope);
280-
281-
$sendersLocator = $this->createSendersLocator([DummyMessage::class => ['foo']], ['foo' => $sender]);
282-
$middleware = new SendMessageMiddleware($sendersLocator);
283-
284-
// next handler *should* be called
285-
$middleware->handle($envelope, $this->getStackMock(true));
286-
}
287-
288270
private function createSendersLocator(array $sendersMap, array $senders, array $sendAndHandle = [])
289271
{
290272
$container = $this->createMock(ContainerInterface::class);

src/Symfony/Component/Messenger/Tests/Transport/Sync/SyncTransportFactoryTest.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
1313

1414
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\MessageBusInterface;
1516
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
1617
use Symfony\Component\Messenger\Transport\Sync\SyncTransport;
1718
use Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory;
@@ -21,7 +22,8 @@ class SyncTransportFactoryTest extends TestCase
2122
public function testCreateTransport()
2223
{
2324
$serializer = $this->createMock(SerializerInterface::class);
24-
$factory = new SyncTransportFactory();
25+
$bus = $this->createMock(MessageBusInterface::class);
26+
$factory = new SyncTransportFactory($bus);
2527
$transport = $factory->createTransport('sync://', [], $serializer);
2628
$this->assertInstanceOf(SyncTransport::class, $transport);
2729
}

src/Symfony/Component/Messenger/Tests/Transport/Sync/SyncTransportTest.php

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,29 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Envelope;
16-
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
16+
use Symfony\Component\Messenger\MessageBusInterface;
17+
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
1718
use Symfony\Component\Messenger\Transport\Sync\SyncTransport;
1819

1920
class SyncTransportTest extends TestCase
2021
{
2122
public function testSend()
2223
{
24+
$bus = $this->createMock(MessageBusInterface::class);
25+
$bus->expects($this->once())
26+
->method('dispatch')
27+
->with($this->callback(function ($arg) {
28+
$this->assertInstanceOf(Envelope::class, $arg);
29+
30+
return true;
31+
}))
32+
->willReturnArgument(0);
2333
$message = new \stdClass();
2434
$envelope = new Envelope($message);
25-
$transport = new SyncTransport();
35+
$transport = new SyncTransport($bus);
2636
$envelope = $transport->send($envelope);
37+
2738
$this->assertSame($message, $envelope->getMessage());
28-
$this->assertNotNull($envelope->last(ForceCallHandlersStamp::class));
39+
$this->assertNotNull($envelope->last(ReceivedStamp::class));
2940
}
3041
}

src/Symfony/Component/Messenger/Transport/Sync/SyncTransport.php

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,27 @@
1313

1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
16-
use Symfony\Component\Messenger\Stamp\ForceCallHandlersStamp;
16+
use Symfony\Component\Messenger\MessageBusInterface;
17+
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
18+
use Symfony\Component\Messenger\Stamp\SentStamp;
1719
use Symfony\Component\Messenger\Transport\TransportInterface;
1820

1921
/**
20-
* A "fake" transport that marks messages to be handled immediately.
22+
* Transport that immediately marks messages as received and dispatches for handling.
2123
*
2224
* @experimental in 4.3
2325
*
2426
* @author Ryan Weaver <ryan@symfonycasts.com>
2527
*/
2628
class SyncTransport implements TransportInterface
2729
{
30+
private $messageBus;
31+
32+
public function __construct(MessageBusInterface $messageBus)
33+
{
34+
$this->messageBus = $messageBus;
35+
}
36+
2837
public function get(): iterable
2938
{
3039
throw new InvalidArgumentException('You cannot receive messages from the Messenger SyncTransport.');
@@ -47,6 +56,12 @@ public function reject(Envelope $envelope): void
4756

4857
public function send(Envelope $envelope): Envelope
4958
{
50-
return $envelope->with(new ForceCallHandlersStamp());
59+
/** @var SentStamp|null $sentStamp */
60+
$sentStamp = $envelope->last(SentStamp::class);
61+
$alias = null === $sentStamp ? 'sync' : $sentStamp->getSenderAlias() ?: $sentStamp->getSenderClass();
62+
63+
$envelope = $envelope->with(new ReceivedStamp($alias));
64+
65+
return $this->messageBus->dispatch($envelope);
5166
}
5267
}

src/Symfony/Component/Messenger/Transport/Sync/SyncTransportFactory.php

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\Messenger\Transport\Sync;
1313

14+
use Symfony\Component\Messenger\MessageBusInterface;
1415
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
1516
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
1617
use Symfony\Component\Messenger\Transport\TransportInterface;
@@ -22,9 +23,16 @@
2223
*/
2324
class SyncTransportFactory implements TransportFactoryInterface
2425
{
26+
private $messageBus;
27+
28+
public function __construct(MessageBusInterface $messageBus)
29+
{
30+
$this->messageBus = $messageBus;
31+
}
32+
2533
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
2634
{
27-
return new SyncTransport();
35+
return new SyncTransport($this->messageBus);
2836
}
2937

3038
public function supports(string $dsn, array $options): bool

0 commit comments

Comments
 (0)
0