10000 [Messenger] Add TransportInterface as first class citizen sender+rece… · symfony/symfony@bd0733d · GitHub
[go: up one dir, main page]

Skip to content

Commit bd0733d

Browse files
[Messenger] Add TransportInterface as first class citizen sender+receiver
1 parent cada38f commit bd0733d

File tree

10 files changed

+131
-69
lines changed

10 files changed

+131
-69
lines changed

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
use Symfony\Component\Messenger\MessageBusInterface;
6666
use Symfony\Component\Messenger\Transport\ReceiverInterface;
6767
use Symfony\Component\Messenger\Transport\SenderInterface;
68+
use Symfony\Component\Messenger\Transport\TransportInterface;
6869
use Symfony\Component\PropertyAccess\PropertyAccessor;
6970
use Symfony\Component\PropertyInfo\PropertyAccessExtractorInterface;
7071
use Symfony\Component\PropertyInfo\PropertyDescriptionExtractorInterface;
@@ -1506,19 +1507,13 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
15061507
$container->getDefinition('messenger.asynchronous.routing.sender_locator')->replaceArgument(1, $messageToSenderIdsMapping);
15071508

15081509
foreach ($config['transports'] as $name => $transport) {
1509-
$senderDefinition = (new Definition(SenderInterface::class))
1510-
->setFactory(array(new Reference('messenger.transport_factory'), 'createSender'))
1511-
->setArguments(array($transport['dsn'], $transport['options']))
1512-
->addTag('messenger.sender', array('name' => $name))
1513-
;
1514-
$container->setDefinition('messenger.sender.'.$name, $senderDefinition);
1515-
1516-
$receiverDefinition = (new Definition(ReceiverInterface::class))
1517-
->setFactory(ar 57A6 ray(new Reference('messenger.transport_factory'), 'createReceiver'))
1510+
$transportDefinition = (new Definition(TransportInterface::class))
1511+
->setFactory(array(new Reference('messenger.transport_factory'), 'createTransport'))
15181512
->setArguments(array($transport['dsn'], $transport['options']))
15191513
->addTag('messenger.receiver', array('name' => $name))
1514+
->addTag('messenger.sender', array('name' => $name))
15201515
;
1521-
$container->setDefinition('messenger.receiver.'.$name, $receiverDefinition);
1516+
$container->setDefinition('messenger.transport.'.$name, $transportDefinition);
15221517
}
15231518
}
15241519

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -532,30 +532,20 @@ public function testMessenger()
532532
public function testMessengerTransports()
533533
{
534534
$container = $this->createContainerFromFile('messenger_transports');
535-
$this->assertTrue($container->hasDefinition('messenger.sender.default'));
536-
$this->assertTrue($container->getDefinition('messenger.sender.default')->hasTag('messenger.sender'));
537-
$this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.sender.default')->getTag('messenger.sender'));
538-
$this->assertTrue($container->hasDefinition('messenger.receiver.default'));
539-
$this->assertTrue($container->getDefinition('messenger.receiver.default')->hasTag('messenger.receiver'));
540-
$this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.receiver.default')->getTag('messenger.receiver'));
541-
542-
$this->assertTrue($container->hasDefinition('messenger.sender.customised'));
543-
$senderFactory = $container->getDefinition('messenger.sender.customised')->getFactory();
544-
$senderArguments = $container->getDefinition('messenger.sender.customised')->getArguments();
545-
546-
$this->assertEquals(array(new Reference('messenger.transport_factory'), 'createSender'), $senderFactory);
547-
$this->assertCount(2, $senderArguments);
548-
$this->assertSame('amqp://localhost/%2f/messages?exchange_name=exchange_name', $senderArguments[0]);
549-
$this->assertSame(array('queue' => array('name' => 'Queue')), $senderArguments[1]);
550-
551-
$this->assertTrue($container->hasDefinition('messenger.receiver.customised'));
552-
$receiverFactory = $container->getDefinition('messenger.receiver.customised')->getFactory();
553-
$receiverArguments = $container->getDefinition('messenger.receiver.customised')->getArguments();
554-
555-
$this->assertEquals(array(new Reference('messenger.transport_factory'), 'createReceiver'), $receiverFactory);
556-
$this->assertCount(2, $receiverArguments);
557-
$this->assertSame('amqp://localhost/%2f/messages?exchange_name=exchange_name', $receiverArguments[0]);
558-
$this->assertSame(array('queue' => array('name' => 'Queue')), $receiverArguments[1]);
535+
$this->assertTrue($container->hasDefinition('messenger.transport.default'));
536+
$this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.receiver'));
537+
$this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.sender'));
538+
$this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.transport.default')->getTag('messenger.receiver'));
539+
$this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.transport.default')->getTag('messenger.sender'));
540+
541+
$this->assertTrue($container->hasDefinition('messenger.transport.customised'));
542+
$transportFactory = $container->getDefinition('messenger.transport.customised')->getFactory();
543+
$transportArguments = $container->getDefinition('messenger.transport.customised')->getArguments();
544+
545+
$this->assertEquals(array(new Reference('messenger.transport_factory'), 'createTransport'), $transportFactory);
546+
$this->assertCount(2, $transportArguments);
547+
$this->assertSame('amqp://localhost/%2f/messages?exchange_name=exchange_name', $transportArguments[0]);
548+
$this->assertSame(array('queue' => array('name' => 'Queue')), $transportArguments[1]);
559549
}
560550

561551
public function testMessengerRouting()

src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public function __construct(EncoderInterface $messageEncoder, Connection $connec
3333
/**
3434
* {@inheritdoc}
3535
*/
36-
public function send($message)
36+
public function send($message): void
3737
{
3838
$encodedMessage = $this->messageEncoder->encode($message);
3939

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport\AmqpExt;
13+
14+
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
15+
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
16+
use Symfony\Component\Messenger\Transport\TransportInterface;
17+
18+
/**
19+
* @author Nicolas Grekas <p@tchwork.com>
20+
*/
21+
class AmqpTransport implements TransportInterface
22+
{
23+
private $encoder;
24+
private $decoder;
25+
private $dsn;
26+
private $options;
27+
private $debug;
28+
private $connection;
29+
private $receiver;
30+
private $sender;
31+
32+
public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, string $dsn, array $options, bool $debug)
33+
{
34+
$this->encoder = $encoder;
35+
$this->decoder = $decoder;
36+
$this->dsn = $dsn;
37+
$this->options = $options;
38+
$this->debug = $debug;
39+
}
40+
41+
/**
42+
* {@inheritdoc}
43+
*/
44+
public function receive(callable $handler): void
45+
{
46+
($this->receiver ?? $this->getReceiver())->receive($hander);
47+
}
48+
49+
/**
50+
* {@inheritdoc}
51+
*/
52+
public function stop(): void
53+
{
54+
($this->receiver ?? $this->getReceiver())->stop();
55+
}
56+
57+
/**
58+
* {@inheritdoc}
59+
*/
60+
public function send($message): void
61+
{
62+
($this->sender ?? $this->getSender())->send($message);
63+
}
64+
65+
private function getReceiver()
66+
{
67+
return $this->receiver = new AmqpReceiver($this->decoder, $this->connection ?? $this->getConnection());
68+
}
69+
70+
private function getSender()
71+
{
72+
return $this->sender = new AmqpSender($this->encoder, $this->connection ?? $this->getConnection());
73+
}
74+
75+
private function getConnection()
76+
{
77+
return $this->connection = new Connection($this->dsn, $this->options, $this->debug);
78+
}
79+
}

src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransportFactory.php

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,10 @@
1111

1212
namespace Symfony\Component\Messenger\Transport\AmqpExt;
1313

14-
use Symfony\Component\Messenger\Transport\Factory\TransportFactoryInterface;
15-
use Symfony\Component\Messenger\Transport\ReceiverInterface;
16-
use Symfony\Component\Messenger\Transport\SenderInterface;
1714
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
1815
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
16+
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
17+
use Symfony\Component\Messenger\Transport\TransportInterface;
1918

2019
/**
2120
* @author Samuel Roze <samuel.roze@gmail.com>
@@ -33,14 +32,9 @@ public function __construct(EncoderInterface $encoder, DecoderInterface $decoder
3332
$this->debug = $debug;
3433
}
3534

36-
public function createReceiver(string $dsn, array $options): ReceiverInterface
35+
public function createTransport(string $dsn, array $options): TransportInterface
3736
{
38-
return new AmqpReceiver($this->decoder, Connection::fromDsn($dsn, $options, $this->debug));
39-
}
40-
41-
public function createSender(string $dsn, array $options): SenderInterface
42-
{
43-
return new AmqpSender($this->encoder, Connection::fromDsn($dsn, $options, $this->debug));
37+
return new AmqpTransport($this->encoder, $this->decoder, $dsn, $options, $thid->debug);
4438
}
4539

4640
public function supports(string $dsn, array $options): bool

src/Symfony/Component/Messenger/Transport/ReceiverInterface.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ interface ReceiverInterface
2424
* The handler will have, as argument, the received message. Note that this message
2525
* can be `null` if the timeout to receive something has expired.
2626
*/
27-
public function receive(callable $handler) : void;
27+
public function receive(callable $handler): void;
2828

2929
/**
3030
* Stop receiving some messages.

src/Symfony/Component/Messenger/Transport/SenderInterface.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,5 @@ interface SenderInterface
2323
*
2424
* @param object $message
2525
*/
26-
public function send($message);
26+
public function send($message): void;
2727
}

src/Symfony/Component/Messenger/Transport/Factory/ChainTransportFactory.php renamed to src/Symfony/Component/Messenger/Transport/TransportFactory.php

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,12 @@
99
* file that was distributed with this source code.
1010
*/
1111

12-
namespace Symfony\Component\Messenger\Transport\Factory;
13-
14-
use Symfony\Component\Messenger\Transport\ReceiverInterface;
15-
use Symfony\Component\Messenger\Transport\SenderInterface;
12+
namespace Symfony\Component\Messenger\Transport;
1613

1714
/**
1815
* @author Samuel Roze <samuel.roze@gmail.com>
1916
*/
20-
class ChainTransportFactory implements TransportFactoryInterface
17+
class TransportFactory implements TransportFactoryInterface
2118
{
2219
private $factories;
2320

@@ -29,22 +26,11 @@ public function __construct(iterable $factories)
2926
$this->factories = $factories;
3027
}
3128

32-
public function createReceiver(string $dsn, array $options): ReceiverInterface
33-
{
34-
foreach ($this->factories as $factory) {
35-
if ($factory->supports($dsn, $options)) {
36-
return $factory->createReceiver($dsn, $options);
37-
}
38-
}
39-
40-
throw new \InvalidArgumentException(sprintf('No transport supports the given DSN "%s".', $dsn));
41-
}
42-
43-
public function createSender(string $dsn, array $options): SenderInterface
29+
public function createTransport(string $dsn, array $options): TransportInterface
4430
{
4531
foreach ($this->factories as $factory) {
4632
if ($factory->supports($dsn, $options)) {
47-
return $factory->createSender($dsn, $options);
33+
return $factory->createTransport($dsn, $options);
4834
}
4935
}
5036

src/Symfony/Component/Messenger/Transport/Factory/TransportFactoryInterface.php renamed to src/Symfony/Component/Messenger/Transport/TransportFactoryInterface.php

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,7 @@
99
* file that was distributed with this source code.
1010
*/
1111

12-
namespace Symfony\Component\Messenger\Transport\Factory;
13-
14-
use Symfony\Component\Messenger\Transport\ReceiverInterface;
15-
use Symfony\Component\Messenger\Transport\SenderInterface;
12+
namespace Symfony\Component\Messenger\Transport;
1613

1714
/**
1815
* Creates a Messenger transport.
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport;
13+
14+
/**
15+
* @author Nicolas Grekas <p@tchwork.com>
16+
*
17+
* @experimental in 4.1
18+
*/
19+
interface TransportInterface extends ReceiverInterface, SenderInterface
20+
{
21+
}

0 commit comments

Comments
 (0)
0