8000 [Messenger] Add AMQP adapter by sroze · Pull Request #26632 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[Messenger] Add AMQP adapter #26632

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
[Messenger] Add AMQP adapter
  • Loading branch information
sroze committed Apr 12, 2018
commit 798c230ad5e0857b03752e3dbaf63ebd209d415e
9 changes: 9 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ addons:
- language-pack-fr-base
- ldap-utils
- slapd
- librabbitmq-dev
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see no test requiring a true AMQP connection (or it misses an @requires extension amqp), is that needed right now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, we use the \AMQP* classes within the tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then they need the @requires extension amqp annotation :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Added 👍


env:
global:
- MIN_PHP=7.1.3
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use messages as vhost (ie: removing the leading /). it's more common.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though, in RabbitMq's defaults, the default virtual host is named /. That's why I used it and I believe as it matches the default, this is the one to keep.


matrix:
include:
Expand All @@ -38,6 +40,7 @@ services:
- memcached
- mongodb
- redis-server
- rabbitmq

before_install:
- |
Expand Down Expand Up @@ -134,6 +137,11 @@ before_install:
- |
# Install extra PHP extensions
if [[ ! $skip ]]; then
# Install librabbitmq
wget http://ftp.debian.org/debian/pool/main/libr/librabbitmq/librabbitmq-dev_0.5.2-2_amd64.deb
wget http://ftp.debian.org/debian/pool/main/libr/librabbitmq/librabbitmq1_0.5.2-2_amd64.deb
sudo dpkg -i librabbitmq1_0.5.2-2_amd64.deb librabbitmq-dev_0.5.2-2_amd64.deb

# install libsodium
sudo add-apt-repository ppa:ondrej/php -y
sudo apt-get update -q
Expand All @@ -142,6 +150,7 @@ before_install:
tfold ext.apcu tpecl apcu-5.1.6 apcu.so $INI
tfold ext.libsodium tpecl libsodium sodium.so $INI
tfold ext.mongodb tpecl mongodb-1.4.0RC1 mongodb.so $INI
tfold ext.amqp tpecl amqp-1.9.3 amqp.so $INI
fi

- |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -970,12 +970,17 @@ private function addMessengerSection(ArrayNodeDefinition $rootNode)
->arrayNode('messenger')
->info('Messenger configuration')
->{!class_exists(FullStack::class) && class_exists(MessageBusInterface::class) ? 'canBeDisabled' : 'canBeEnabled'}()
->fixXmlConfig('adapter')
->children()
->arrayNode('routing')
->useAttributeAsKey('message_class')
->beforeNormalization()
->always()
->then(function ($config) {
if (!is_array($config)) {
return array();
}

$newConfig = array();
foreach ($config as $k => $v) {
if (!is_int($k)) {
Expand Down Expand Up @@ -1010,6 +1015,28 @@ function ($a) {
->end()
->end()
->end()
->arrayNode('adapters')
->useAttributeAsKey('name')
->arrayPrototype()
->beforeNormalization()
->ifString()
->then(function (string $dsn) {
return array('dsn' => $dsn);
})
->end()
->fixXmlConfig('option')
->children()
->scalarNode('dsn')->end()
->arrayNode('options')
->normalizeKeys(false)
->useAttributeAsKey('name')
->defaultValue(array())
->prototype('variable')
->end()
->end()
->end()
->end()
->end()
->end()
->end()
->end()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1470,6 +1470,24 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
} else {
$container->removeDefinition('messenger.middleware.validator');
}

foreach ($config['adapters'] as $name => $adapter) {
$container->setDefinition('messenger.sender.'.$name, (new Definition(SenderInterface::class))->setFactory(array(
new Reference('messenger.adapter_factory'),
'createSender',
))->setArguments(array(
$adapter['dsn'],
$adapter['options'],
))->addTag('messenger.sender'));

$container->setDefinition('messenger.receiver.'.$name, (new Definition(ReceiverInterface::class))->setFactory(array(
new Reference('messenger.adapter_factory'),
'createReceiver',
))->setArguments(array(
$adapter['dsn'],
$adapter['options'],
))->addTag('messenger.receiver'));
}
}

private function registerCacheConfiguration(array $config, ContainerBuilder $container)
Expand Down
13 changes: 13 additions & 0 deletions src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,18 @@
<tag name="container.service_locator" />
<argument type="collection" />
</service>

<!-- Adapters -->
<service id="messenger.adapter_factory" class="Symfony\Component\Messenger\Adapter\Factory\ChainAdapterFactory">
<argument type="tagged" tag="messenger.adapter_factory" />
</service>

<service id="messenger.adapter.amqp.factory" class="Symfony\Component\Messenger\Adapter\AmqpExt\AmqpAdapterFactory">
<argument type="service" id="messenger.transport.default_encoder" />
<argument type="service" id="messenger.transport.default_decoder" />
<argument>%kernel.debug%</argument>

<tag name="messenger.adapter_factory" />
</service>
</services>
</container>
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@
<xsd:sequence>
<xsd:element name="routing" type="messenger_routing" minOccurs="0" maxOccurs="unbounded" />
<xsd:element name="middlewares" type="messenger_middleware" minOccurs="0" maxOccurs="unbounded" />
<xsd:element name="adapter" type="messenger_adapter" minOccurs="0" maxOccurs="unbounded" />
</xsd:sequence>
</xsd:complexType>

Expand All @@ -368,6 +369,19 @@
<xsd:attribute name="service" type="xsd:string" use="required"/>
</xsd:complexType>

<xsd:complexType name="messenger_adapter">
<xsd:sequence>
<xsd:element name="option" type="messenger_adapter_option" minOccurs="0" maxOccurs="unbounded" />
</xsd:sequence>
<xsd:attribute name="name" type="xsd:string" />
<xsd:attribute name="dsn" type="xsd:string" />
</xsd:complexType>

<xsd:complexType name="messenger_adapter_option">
<xsd:attribute name="name" type="xsd:string" />
<xsd:attribute name="value" type="xsd:string" />
</xsd:complexType>

<xsd:complexType name="messenger_middleware">
<xsd:sequence>
<xsd:element name="validation" type="messenger_validation" minOccurs="0" maxOccurs="1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ class_exists(SemaphoreStore::class) && SemaphoreStore::isSupported() ? 'semaphor
'enabled' => !class_exists(FullStack::class),
),
),
'adapters' => array(),
),
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

$container->loadFromExtension('framework', array(
'messenger' => array(
'adapters' => array(
'default' => 'amqp://localhost/%2f/messages',
'customised' => array(
'dsn' => 'amqp://localhost/%2f/messages?exchange_name=exchange_name',
'options' => array('queue_name' => 'Queue'),
),
),
),
));
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="utf-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony http://symfony.com/schema/dic/symfony/symfony-1.0.xsd">

<framework:config>
<framework:messenger>
<framework:adapter name="default" dsn="amqp://localhost/%2f/messages" />
<framework:adapter name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name">
<framework:option name="queue_name" value="Queue" />
</framework:adapter>
</framework:messenger>
</framework:config>
</container>
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
framework:
messenger:
adapters:
default: 'amqp://localhost/%2f/messages'
customised:
dsn: 'amqp://localhost/%2f/messages?exchange_name=exchange_name'
options:
queue_name: Queue
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ public function testWebLink()
public function testMessenger()
{
$container = $this->createContainerFromFile('messenger');
$this->assertTrue($container->hasDefinition('message_bus'));
$this->assertFalse($container->hasDefinition('messenger.middleware.doctrine_transaction'));
}

Expand All @@ -532,6 +533,33 @@ public function testMessengerValidationDisabled()
$this->assertFalse($container->hasDefinition('messenger.middleware.validator'));
}

public function testMessengerAdapter()
{
$container = $this->createContainerFromFile('messenger_adapter');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the messenger key can be enabled even if the component is not part of the composer.json dev reqs?
I suspect the FrameworkExtension is missing something like:

if (!class_exists(MessageBusInterface::class)) {
    throw new LogicException('Messenger cannot be enabled as the Messenger component is not installed.');
}

😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, will handle that in another PR 😉

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You fixed it in #26816, thank you :)

$this->assertTrue($container->hasDefinition('messenger.sender.default'));
$this->assertTrue($container->getDefinition('messenger.sender.default')->hasTag('messenger.sender'));
$this->assertTrue($container->hasDefinition('messenger.receiver.default'));
$this->assertTrue($container->getDefinition('messenger.receiver.default')->hasTag('messenger.receiver'));

$this->assertTrue($container->hasDefinition('messenger.sender.customised'));
$senderFactory = $container->getDefinition('messenger.sender.customised')->getFactory();
$senderArguments = $container->getDefinition('messenger.sender.customised')->getArguments();

$this->assertEquals(array(new Reference('messenger.adapter_factory'), 'createSender'), $senderFactory);
$this->assertCount(2, $senderArguments);
$this->assertEquals('amqp://localhost/%2f/messages?exchange_name=exchange_name', $senderArguments[0]);
$this->assertEquals(array('queue_name' => 'Queue'), $senderArguments[1]);

$this->assertTrue($container->hasDefinition('messenger.receiver.customised'));
$receiverFactory = $container->getDefinition('messenger.receiver.customised')->getFactory();
$receiverArguments = $container->getDefinition('messenger.receiver.customised')->getArguments();

$this->assertEquals(array(new Reference('messenger.adapter_factory'), 'createReceiver'), $receiverFactory);
$this->assertCount(2, $receiverArguments);
$this->assertEquals('amqp://localhost/%2f/messages?exchange_name=exchange_name', $receiverArguments[0]);
$this->assertEquals(array('queue_name' => 'Queue'), $receiverArguments[1]);
}

public function testTranslator()
{
$container = $this->createContainerFromFile('full');
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Adapter\AmqpExt;

use Symfony\Component\Messenger\Adapter\Factory\AdapterFactoryInterface;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class AmqpAdapterFactory implements AdapterFactoryInterface
{
private $encoder;
private $decoder;
private $debug;

public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, bool $debug)
{
$this->encoder = $encoder;
$this->decoder = $decoder;
$this->debug = $debug;
}

public function createReceiver(string $dsn, array $options): ReceiverInterface
{
return new AmqpReceiver($this->decoder, Connection::fromDsn($dsn, $options, $this->debug));
}

public function createSender(string $dsn, array $options): SenderInterface
{
return new AmqpSender($this->encoder, Connection::fromDsn($dsn, $options, $this->debug));
}

public function supports(string $dsn, array $options): bool
{
return 0 === strpos($dsn, 'amqp://');
}
}
35 changes: 35 additions & 0 deletions src/Symfony/Component/Messenger/Adapter/AmqpExt/AmqpFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Adapter\AmqpExt;

class AmqpFactory
{
public function createConnection(array $credentials): \AMQPConnection
{
return new \AMQPConnection($credentials);
}

public function createChannel(\AMQPConnection $connection): \AMQPChannel
{
return new \AMQPChannel($connection);
}

public function createQueue(\AMQPChannel $channel): \AMQPQueue
{
return new \AMQPQueue($channel);
Copy link
@davidbarratt davidbarratt Apr 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these \AMQP classes are only in the amqp pecl extension? if so, it should be a requirement in composer.json
https://getcomposer.org/doc/04-schema.md#package-links
something like

"ext-amqp": "^1.0.0"

unless there's an alternative?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct. We can't add the requirement in this composer.json file as it's not a requirement for the entire component, but for its AMQP adapter only. Though you are right in pointing that we don't handle properly the case when the extension is not installed: we can add a nice exception message when trying to create the adapter while the extension does not exist.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good to know! I'm not going to hold up the train for that, but probably a good idea to add at some point.

}

public function createExchange(\AMQPChannel $channel): \AMQPExchange
{
return new \AMQPExchange($channel);
}
}
Loading
0