8000 minor #27164 [Messenger] Add TransportInterface as first class citize… · symfony/symfony@247cbb0 · GitHub
[go: up one dir, main page]

Skip to content

Commit 247cbb0

Browse files
committed
minor #27164 [Messenger] Add TransportInterface as first class citizen sender+receiver (nicolas-grekas)
This PR was merged into the 4.1 branch. Discussion ---------- [Messenger] Add TransportInterface as first class citizen sender+receiver | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | no | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | - | License | MIT | Doc PR | - The current design misses an opportunity to reuse the same connection for the sender and the receiver parts of a transport. By making `TransportInterface` a first class citizen, we simplify the wiring logic, we allow sharing the same connection for both the sender and the receiver, and we provide a natural point to lazily create the connection. Live from Las Vegas :) ![image](https://user-images.githubusercontent.com/243674/39658543-93c7120c-4fca-11e8-9f11-797953b3ee98.png) Commits ------- 379b8eb [Messenger] Add TransportInterface as first class citizen sender+receiver
2 parents c3d4536 + 379b8eb commit 247cbb0

File tree

9 files changed

+130
-68
lines changed
  • 9 files changed

    +130
    -68
    lines changed

    src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

    Lines changed: 5 additions & 10 deletions
    Original file line numberDiff line numberDiff line change
    @@ -65,6 +65,7 @@
    6565
    use Symfony\Component\Messenger\MessageBusInterface;
    6666
    use Symfony\Component\Messenger\Transport\ReceiverInterface;
    6767
    use Symfony\Component\Messenger\Transport\SenderInterface;
    68+
    use Symfony\Component\Messenger\Transport\TransportInterface;
    6869
    use Symfony\Component\PropertyAccess\PropertyAccessor;
    6970
    use Symfony\Component\PropertyInfo\PropertyAccessExtractorInterface;
    7071
    use Symfony\Component\PropertyInfo\PropertyDescriptionExtractorInterface;
    @@ -1506,19 +1507,13 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
    15061507
    throw new LogicException('The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enable it or install it by running "composer require symfony/serializer-pack".');
    15071508
    }
    15081509

    1509-
    $senderDefinition = (new Definition(SenderInterface::class))
    1510-
    ->setFactory(array(new Reference('messenger.transport_factory'), 'createSender'))
    1511-
    ->setArguments(array($transport['dsn'], $transport['options']))
    1512-
    ->addTag('messenger.sender', array('name' => $name))
    1513-
    ;
    1514-
    $container->setDefinition('messenger.sender.'.$name, $senderDefinition);
    1515-
    1516-
    $receiverDefinition = (new Definition(ReceiverInterface::class))
    1517-
    ->setFactory(array(new Reference('messenger.transport_factory'), 'createReceiver'))
    1510+
    $transportDefinition = (new Definition(TransportInterface::class))
    1511+
    ->setFactory(array(new Reference('messenger.transport_factory'), 'createTransport'))
    15181512
    ->setArguments(array($transport['dsn'], $transport['options']))
    15191513
    ->addTag('messenger.receiver', array('name' => $name))
    1514+
    ->addTag('messenger.sender', array('name' => $name))
    15201515
    ;
    1521-
    $container->setDefinition('messenger.receiver.'.$name, $receiverDefinition);
    1516+
    $container->setDefinition('messenger.transport.'.$name, $transportDefinition);
    15221517
    }
    15231518
    }
    15241519

    src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

    Lines changed: 14 additions & 24 deletions
    Original file line numberDiff line numberDiff line change
    @@ -533,30 +533,20 @@ public function testMessenger()
    533533
    public function testMessengerTransports()
    534534
    {
    535535
    $container = $this->createContainerFromFile('messenger_transports');
    536-
    $this->assertTrue($container->hasDefinition('messenger.sender.default'));
    537-
    $this->assertTrue($container->getDefinition('messenger.sender.default')->hasTag('messenger.sender'));
    538-
    $this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.sender.default')->getTag('messenger.sender'));
    539-
    $this->assertTrue($container->hasDefinition('messenger.receiver.default'));
    540-
    $this->assertTrue($container->getDefinition('messenger.receiver.default')->hasTag('messenger.receiver'));
    541-
    $this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.receiver.default')->getTag('messenger.receiver'));
    542-
    543-
    $this->assertTrue($container->hasDefinition('messenger.sender.customised'));
    544-
    $senderFactory = $container->getDefinition('messenger.sender.customised')->getFactory();
    545-
    $senderArguments = $container->getDefinition('messenger.sender.customised')->getArguments();
    546-
    547-
    $this->assertEquals(array(new Reference('messenger.transport_factory'), 'createSender'), $senderFactory);
    548-
    $this->assertCount(2, $senderArguments);
    549-
    $this->assertSame('amqp://localhost/%2f/messages?exchange_name=exchange_name', $senderArguments[0]);
    550-
    $this->assertSame(array('queue' => array('name' => 'Queue')), $senderArguments[1]);
    551-
    552-
    $this->assertTrue($container->hasDefinition('messenger.receiver.customised'));
    553-
    $receiverFactory = $container->getDefinition('messenger.receiver.customised')->getFactory();
    554-
    $receiverArguments = $container->getDefinition('messenger.receiver.customised')->getArguments();
    555-
    556-
    $this->assertEquals(array(new Reference('messenger.transport_factory'), 'createReceiver'), $receiverFactory);
    557-
    $this->assertCount(2, $receiverArguments);
    558-
    $this->assertSame('amqp://localhost/%2f/messages?exchange_name=exchange_name', $receiverArguments[0]);
    559-
    $this->assertSame(array('queue' => array('name' => 'Queue')), $receiverArguments[1]);
    536+
    $this->assertTrue($container->hasDefinition('messenger.transport.default'));
    537+
    $this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.receiver'));
    538+
    $this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.sender'));
    539+
    $this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.transport.default')->getTag('messenger.receiver'));
    540+
    $this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.transport.default')->getTag('messenger.sender'));
    541+
    542+
    $this->assertTrue($container->hasDefinition('messenger.transport.customised'));
    543+
    $transportFactory = $container->getDefinition('messenger.transport.customised')->getFactory();
    544+
    $transportArguments = $container->getDefinition('messenger.transport.customised')->getArguments();
    545+
    546+
    $this->assertEquals(array(new Reference('messenger.transport_factory'), 'createTransport'), $transportFactory);
    547+
    $this->assertCount(2, $transportArguments);
    548+
    $this->assertSame('amqp://localhost/%2f/messages?exchange_name=exchange_name', $transportArguments[0]);
    549+
    $this->assertSame(array('queue' => array('name' => 'Queue')), $transportArguments[1]);
    560550

    561551
    $this->assertTrue($container->hasDefinition('messenger.transport.amqp.factory'));
    562552
    }

    src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php

    Lines changed: 1 addition & 1 deletion
    Original file line numberDiff line numberDiff line change
    @@ -33,7 +33,7 @@ public function __construct(EncoderInterface $messageEncoder, Connection $connec
    3333
    /**
    3434
    * {@inheritdoc}
    3535
    */
    36-
    public function send($message)
    36+
    public function send($message): void
    3737
    {
    3838
    $encodedMessage = $this->messageEncoder->encode($message);
    3939

    Lines changed: 79 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -0,0 +1,79 @@
    1+
    <?php
    2+
    3+
    /*
    4+
    * This file is part of the Symfony package.
    5+
    *
    6+
    * (c) Fabien Potencier <fabien@symfony.com>
    7+
    *
    8+
    * For the full copyright and license information, please view the LICENSE
    9+
    * file that was distributed with this source code.
    10+
    */
    11+
    12+
    namespace Symfony\Component\Messenger\Transport\AmqpExt;
    13+
    14+
    use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
    15+
    use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
    16+
    use Symfony\Component\Messenger\Transport\TransportInterface;
    17+
    18+
    /**
    19+
    * @author Nicolas Grekas <p@tchwork.com>
    20+
    */
    21+
    class AmqpTransport implements TransportInterface
    22+
    {
    23+
    private $encoder;
    24+
    private $decoder;
    25+
    private $dsn;
    26+
    private $options;
    27+
    private $debug;
    28+
    private $connection;
    29+
    private $receiver;
    30+
    private $sender;
    31+
    32+
    public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, string $dsn, array $options, bool $debug)
    33+
    {
    34+
    $this->encoder = $encoder;
    35+
    $this->decoder = $decoder;
    36+
    $this->dsn = $dsn;
    37+
    $this->options = $options;
    38+
    $this->debug = $debug;
    39+
    }
    40+
    41+
    /**
    42+
    * {@inheritdoc}
    43+
    */
    44+
    public function receive(callable $handler): void
    45+
    {
    46+
    ($this->receiver ?? $this->getReceiver())->receive($hander);
    47+
    }
    48+
    49+
    /**
    50+
    * {@inheritdoc}
    51+
    */
    52+
    public function stop(): void
    53+
    {
    54+
    ($this->receiver ?? $this->getReceiver())->stop();
    55+
    }
    56+
    57+
    /**
    58+
    * {@inheritdoc}
    59+
    */
    60+
    public function send($message): void
    61+
    {
    62+
    ($this->sender ?? $this->getSender())->send($message);
    63+
    }
    64+
    65+
    private function getReceiver()
    66+
    {
    67+
    return $this->receiver = new AmqpReceiver($this->decoder, $this->connection ?? $this->getConnection());
    68+
    }
    69+
    70+
    private function getSender()
    71+
    {
    72+
    return $this->sender = new AmqpSender($this->encoder, $this->connection ?? $this->getConnection());
    73+
    }
    74+
    75+
    private function getConnection()
    76+
    {
    77+
    return $this->connection = new Connection($this->dsn, $this->options, $this->debug);
    78+
    }
    79+
    }

    src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransportFactory.php

    Lines changed: 4 additions & 10 deletions
    Original file line numberDiff line numberDiff line change
    @@ -11,11 +11,10 @@
    1111

    1212
    namespace Symfony\Component\Messenger\Transport\AmqpExt;
    1313

    14-
    use Symfony\Component\Messenger\Transport\Factory\TransportFactoryInterface;
    15-
    use Symfony\Component\Messenger\Transport\ReceiverInterface;
    16-
    use Symfony\Component\Messenger\Transport\SenderInterface;
    1714
    use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
    1815
    use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
    16+
    use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
    17+
    use Symfony\Component\Messenger\Transport\TransportInterface;
    1918

    2019
    /**
    2120
    * @author Samuel Roze <samuel.roze@gmail.com>
    @@ -33,14 +32,9 @@ public function __construct(EncoderInterface $encoder, DecoderInterface $decoder
    3332
    $this->debug = $debug;
    3433
    }
    3534

    36-
    public function createReceiver(string $dsn, array $options): ReceiverInterface
    35+
    public function createTransport(string $dsn, array $options): TransportInterface
    3736
    {
    38-
    return new AmqpReceiver($this->decoder, Connection::fromDsn($dsn, $options, $this->debug));
    39-
    }
    40-
    41-
    public function createSender(string $dsn, array $options): SenderInterface
    42-
    {
    43-
    return new AmqpSender($this->encoder, Connection::fromDsn($dsn, $options, $this->debug));
    37+
    return new AmqpTransport($this->encoder, $this->decoder, $dsn, $options, $thid->debug);
    4438
    }
    4539

    4640
    public function supports(string $dsn, array $options): bool

    src/Symfony/Component/Messenger/Transport/SenderInterface.php

    Lines changed: 1 addition & 1 deletion
    Original file line numberDiff line numberDiff line change
    @@ -23,5 +23,5 @@ interface SenderInterface
    2323
    *
    2424
    * @param object $message
    2525
    */
    26-
    public function send($message);
    26+
    public function send($message): void;
    2727
    }

    src/Symfony/Component/Messenger/Transport/Factory/ChainTransportFactory.php renamed to src/Symfony/Component/Messenger/Transport/TransportFactory.php

    Lines changed: 4 additions & 18 deletions
    Original file line numberDiff line numberDiff line change
    @@ -9,15 +9,12 @@
    99
    * file that was distributed with this source code.
    1010
    */
    1111

    12-
    namespace Symfony\Component\Messenger\Transport\Factory;
    13-
    14-
    use Symfony\Component\Messenger\Transport\ReceiverInterface;
    15-
    use Symfony\Component\Messenger\Transport\SenderInterface;
    12+
    namespace Symfony\Component\Messenger\Transport;
    1613

    1714
    /**
    1815
    * @author Samuel Roze <samuel.roze@gmail.com>
    1916
    */
    20-
    class ChainTransportFactory implements TransportFactoryInterface
    17+
    class TransportFactory implements TransportFactoryInterface
    2118
    {
    2219
    private $factories;
    2320

    @@ -29,22 +26,11 @@ public function __construct(iterable $factories)
    2926
    $this->factories = $factories;
    3027
    }
    3128

    32-
    public function createReceiver(string $dsn, array $options): ReceiverInterface
    33-
    {
    34-
    foreach ($this->factories as $factory) {
    35-
    if ($factory->supports($dsn, $options)) {
    36-
    return $factory->createReceiver($dsn, $options);
    37-
    }
    38-
    }
    39-
    40-
    throw new \InvalidArgumentException(sprintf('No transport supports the given DSN "%s".', $dsn));
    41-
    }
    42-
    43-
    public function createSender(string $dsn, array $options): SenderInterface
    29+
    public function createTransport(string $dsn, array $options): TransportInterface
    4430
    {
    4531
    foreach ($this->factories as $factory) {
    4632
    if ($factory->supports($dsn, $options)) {
    47-
    return $factory->createSender($dsn, $options);
    33+
    return $factory->createTransport($dsn, $options);
    4834
    }
    4935
    }
    5036

    src/Symfony/Component/Messenger/Transport/Factory/TransportFactoryInterface.php renamed to src/Symfony/Component/Messenger/Transport/TransportFactoryInterface.php

    Lines changed: 1 addition & 4 deletions
    Original file line numberDiff line numberDiff line change
    @@ -9,10 +9,7 @@
    99
    * file that was distributed with this source code.
    1010
    */
    1111

    12-
    namespace Symfony\Component\Messenger\Transport\Factory;
    13-
    14-
    use Symfony\Component\Messenger\Transport\ReceiverInterface;
    15-
    use Symfony\Component\Messenger\Transport\SenderInterface;
    12+
    namespace Symfony\Component\Messenger\Transport;
    1613

    1714
    /**
    1815
    * Creates a Messenger transport.
    Lines changed: 21 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -0,0 +1,21 @@
    1+
    <?php
    2+
    3+
    /*
    4+
    * This file is part of the Symfony package.
    5+
    *
    6+
    * (c) Fabien Potencier <fabien@symfony.com>
    7+
    *
    8+
    * For the full copyright and license information, please view the LICENSE
    9+
    * file that was distributed with this source code.
    10+
    */
    11+
    12+
    namespace Symfony\Component\Messenger\Transport;
    13+
    14+
    /**
    15+
    * @author Nicolas Grekas <p@tchwork.com>
    16+
    *
    17+
    * @experimental in 4.1
    18+
    */
    19+
    interface TransportInterface extends ReceiverInterface, SenderInterface
    20+
    {
    21+
    }

    0 commit comments

    Comments
     (0)
    0