8000 [Messenger] Add TransportInterface as first class citizen sender+receiver by nicolas-grekas · Pull Request #27164 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[Messenger] Add TransportInterface as first class citizen sender+receiver #27164

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
[Messenger] Add TransportInterface as first class citizen sender+rece…
…iver
  • Loading branch information
nicolas-grekas committed May 7, 2018
commit 379b8eb56b559acab7d53acad2278277b6846439
10000
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\PropertyAccess\PropertyAccessor;
use Symfony\Component\PropertyInfo\PropertyAccessExtractorInterface;
use Symfony\Component\PropertyInfo\PropertyDescriptionExtractorInterface;
Expand Down Expand Up @@ -1506,19 +1507,13 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
throw new LogicException('The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enable it or install it by running "composer require symfony/serializer-pack".');
}

$senderDefinition = (new Definition(SenderInterface::class))
->setFactory(array(new Reference('messenger.transport_factory'), 'createSender'))
->setArguments(array($transport['dsn'], $transport['options']))
->addTag('messenger.sender', array('name' => $name))
;
$container->setDefinition('messenger.sender.'.$name, $senderDefinition);

$receiverDefinition = (new Definition(ReceiverInterface::class))
->setFactory(array(new Reference('messenger.transport_factory'), 'createReceiver'))
$transportDefinition = (new Definition(TransportInterface::class))
->setFactory(array(new Reference('messenger.transport_factory'), 'createTransport'))
->setArguments(array($transport['dsn'], $transport['options']))
->addTag('messenger.receiver', array('name' => $name))
->addTag('messenger.sender', array('name' => $name))
;
$container->setDefinition('messenger.receiver.'.$name, $receiverDefinition);
$container->setDefinition('messenger.transport.'.$name, $transportDefinition);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,30 +533,20 @@ public function testMessenger()
public function testMessengerTransports()
{
$container = $this->createContainerFromFile('messenger_transports');
$this->assertTrue($container->hasDefinition('messenger.sender.default'));
$this->assertTrue($container->getDefinition('messenger.sender.default')->hasTag('messenger.sender'));
$this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.sender.default')->getTag('messenger.sender'));
$this->assertTrue($container->hasDefinition('messenger.receiver.default'));
$this->assertTrue($container->getDefinition('messenger.receiver.default')->hasTag('messenger.receiver'));
$this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.receiver.default')->getTag('messenger.receiver'));

$this->assertTrue($container->hasDefinition('messenger.sender.customised'));
$senderFactory = $container->getDefinition('messenger.sender.customised')->getFactory();
$senderArguments = $container->getDefinition('messenger.sender.customised')->getArguments();

$this->assertEquals(array(new Reference('messenger.transport_factory'), 'createSender'), $senderFactory);
$this->assertCount(2, $senderArguments);
$this->assertSame('amqp://localhost/%2f/messages?exchange_name=exchange_name', $senderArguments[0]);
$this->assertSame(array('queue' => array('name' => 'Queue')), $senderArguments[1]);

$this->assertTrue($container->hasDefinition('messenger.receiver.customised'));
$receiverFactory = $container->getDefinition('messenger.receiver.customised')->getFactory();
$receiverArguments = $container->getDefinition('messenger.receiver.customised')->getArguments();

$this->assertEquals(array(new Reference('messenger.transport_factory'), 'createReceiver'), $receiverFactory);
$this->assertCount(2, $receiverArguments);
$this->assertSame('amqp://localhost/%2f/messages?exchange_name=exchange_name', $receiverArguments[0]);
$this->assertSame(array('queue' => array('name' => 'Queue')), $receiverArguments[1]);
$this->assertTrue($container->hasDefinition('messenger.transport.default'));
$this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.receiver'));
$this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.sender'));
$this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.transport.default')->getTag('messenger.receiver'));
$this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.transport.default')->getTag('messenger.sender'));

$this->assertTrue($container->hasDefinition('messenger.transport.customised'));
$transportFactory = $container->getDefinition('messenger.transport.customised')->getFactory();
$transportArguments = $container->getDefinition('messenger.transport.customised')->getArguments();

$this->assertEquals(array(new Reference('messenger.transport_factory'), 'createTransport'), $transportFactory);
$this->assertCount(2, $transportArguments);
$this->assertSame('amqp://localhost/%2f/messages?exchange_name=exchange_name', $transportArguments[0]);
$this->assertSame(array('queue' => array('name' => 'Queue')), $transportArguments[1]);

$this->assertTrue($container->hasDefinition('messenger.transport.amqp.factory'));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public function __construct(EncoderInterface $messageEncoder, Connection $connec
/**
* {@inheritdoc}
*/
public function send($message)
public function send($message): void
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m not fond of the idea of enforcing void as a result. Senders might return something later on, we shouldn’t close that door IMHO.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing void should be ok for the bc policy. See symfony/symfony-docs#9717

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough.

EDBE
{
$encodedMessage = $this->messageEncoder->encode($message);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\AmqpExt;

use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

/**
* @author Nicolas Grekas <p@tchwork.com>
*/
class AmqpTransport implements TransportInterface
{
private $encoder;
private $decoder;
private $dsn;
private $options;
private $debug;
private $connection;
private $receiver;
private $sender;

public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, string $dsn, array $options, bool $debug)
{
$this->encoder = $encoder;
$this->decoder = $decoder;
$this->dsn = $dsn;
$this->options = $options;
$this->debug = $debug;
}

/**
* {@inheritdoc}
*/
public function receive(callable $handler): void
{
($this->receiver ?? $this->getReceiver())->receive($hander);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems ternary operator fits better here (same below) ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean, doesn't feel like to me :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @yceruto mean next variant:

$this->receiver ? $this->receiver->receive($hander) ? $this->getReceiver()->receive($hander)

Copy link
Member
@yceruto yceruto May 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant ($this->receiver ?: $this->getReceiver())-> instead, as $this->receiver is always defined I don't see the need for ??, but probably I'm missing something here :)

Copy link
Contributor
@ogizanagi ogizanagi May 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$this->receiver is null until $this->getReceiver() is called a first time. So makes sense to use the null-coalescing operator rather than ternary for this :)

Copy link
Member
@yceruto yceruto May 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the manual:

The null coalescing operator (??) has been added as syntactic sugar for the common case of needing to use a ternary in conjunction with isset(). It returns its first operand if IT EXISTS and is not NULL; otherwise it returns its second operand.

So ternary operator fits better in this case as $this->receiver always exists (i.e. this var is defined always), but NVM it's just a detail, ?? works fine too.

}

/**
* {@inheritdoc}
*/
public function stop(): void
{
($this->receiver ?? $this->getReceiver())->stop();
}

/**
* {@inheritdoc}
*/
public function send($message): void
{
($this->sender ?? $this->getSender())->send($message);
}

private function getReceiver()
{
return $this->receiver = new AmqpReceiver($this->decoder, $this->connection ?? $this->getConnection());
}

private function getSender()
{
return $this->sender = new AmqpSender($this->encoder, $this->connection ?? $this->getConnection());
}

private function getConnection()
{
return $this->connection = new Connection($this->dsn, $this->options, $this->debug);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@

namespace Symfony\Component\Messenger\Transport\AmqpExt;

use Symfony\Component\Messenger\Transport\Factory\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

/**
* @author Samuel Roze <samuel.roze@gmail.com>
Expand All @@ -33,14 +32,9 @@ public function __construct(EncoderInterface $encoder, DecoderInterface $decoder
$this->debug = $debug;
}

public function createReceiver(string $dsn, array $options): ReceiverInterface
public function createTransport(string $dsn, array $options): TransportInterface
{
return new AmqpReceiver($this->decoder, Connection::fromDsn($dsn, $options, $this->debug));
}

public function createSender(string $dsn, array $options): SenderInterface
{
return new AmqpSender($this->encoder, Connection::fromDsn($dsn, $options, $this->debug));
return new AmqpTransport($this->encoder, $this->decoder, $dsn, $options, $thid->debug);
}

public function supports(string $dsn, array $options): bool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ interface SenderInterface
*
* @param object $message
*/
public function send($message);
public function send($message): void;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,12 @@
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\Factory;

use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;
namespace Symfony\Component\Messenger\Transport;

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class ChainTransportFactory implements TransportFactoryInterface
class TransportFactory implements TransportFactoryInterface
{
private $factories;

Expand All @@ -29,22 +26,11 @@ public function __construct(iterable $factories)
$this->factories = $factories;
}

public function createReceiver(string $dsn, array $options): ReceiverInterface
{
foreach ($this->factories as $factory) {
if ($factory->supports($dsn, $options)) {
return $factory->createReceiver($dsn, $options);
}
}

throw new \InvalidArgumentException(sprintf('No transport supports the given DSN "%s".', $dsn));
}

public function createSender(string $dsn, array $options): SenderInterface
public function createTransport(string $dsn, array $options): TransportInterface
{
foreach ($this->factories as $factory) {
if ($factory->supports($dsn, $options)) {
return $factory->createSender($dsn, $options);
return $factory->createTransport($dsn, $options);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\Factory;

use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;
namespace Symfony\Component\Messenger\Transport;

/**
* Creates a Messenger transport.
Expand Down
21 changes: 21 additions & 0 deletions src/Symfony/Component/Messenger/Transport/TransportInterface.php
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport;

/**
* @author Nicolas Grekas <p@tchwork.com>
*
* @experimental in 4.1
*/
interface TransportInterface extends ReceiverInterface, SenderInterface
{
}
0