-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] Add AMQP adapter #26632
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Messenger] Add AMQP adapter #26632
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,11 +12,13 @@ addons: | |
- language-pack-fr-base | ||
- ldap-utils | ||
- slapd | ||
- librabbitmq-dev | ||
|
||
env: | ||
global: | ||
- MIN_PHP=7.1.3 | ||
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php | ||
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Though, in RabbitMq's defaults, the default virtual host is named |
||
|
||
matrix: | ||
include: | ||
|
@@ -38,6 +40,7 @@ services: | |
- memcached | ||
- mongodb | ||
- redis-server | ||
- rabbitmq | ||
|
||
before_install: | ||
- | | ||
|
@@ -134,6 +137,11 @@ before_install: | |
- | | ||
# Install extra PHP extensions | ||
if [[ ! $skip ]]; then | ||
# Install librabbitmq | ||
wget http://ftp.debian.org/debian/pool/main/libr/librabbitmq/librabbitmq-dev_0.5.2-2_amd64.deb | ||
wget http://ftp.debian.org/debian/pool/main/libr/librabbitmq/librabbitmq1_0.5.2-2_amd64.deb | ||
sudo dpkg -i librabbitmq1_0.5.2-2_amd64.deb librabbitmq-dev_0.5.2-2_amd64.deb | ||
|
||
# install libsodium | ||
sudo add-apt-repository ppa:ondrej/php -y | ||
sudo apt-get update -q | ||
|
@@ -142,6 +150,7 @@ before_install: | |
tfold ext.apcu tpecl apcu-5.1.6 apcu.so $INI | ||
tfold ext.libsodium tpecl libsodium sodium.so $INI | ||
tfold ext.mongodb tpecl mongodb-1.4.0RC1 mongodb.so $INI | ||
tfold ext.amqp tpecl amqp-1.9.3 amqp.so $INI | ||
fi | ||
|
||
- | | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
<?php | ||
|
||
$container->loadFromExtension('framework', array( | ||
'messenger' => array( | ||
'adapters' => array( | ||
'default' => 'amqp://localhost/%2f/messages', | ||
'customised' => array( | ||
'dsn' => 'amqp://localhost/%2f/messages?exchange_name=exchange_name', | ||
'options' => array('queue_name' => 'Queue'), | ||
), | ||
), | ||
), | ||
)); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
<?xml version="1.0" encoding="utf-8" ?> | ||
<container xmlns="http://symfony.com/schema/dic/services" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xmlns:framework="http://symfony.com/schema/dic/symfony" | ||
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd | ||
http://symfony.com/schema/dic/symfony http://symfony.com/schema/dic/symfony/symfony-1.0.xsd"> | ||
|
||
<framework:config> | ||
<framework:messenger> | ||
<framework:adapter name="default" dsn="amqp://localhost/%2f/messages" /> | ||
<framework:adapter name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name"> | ||
<framework:option name="queue_name" value="Queue" /> | ||
</framework:adapter> | ||
</framework:messenger> | ||
</framework:config> | ||
</container> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
framework: | ||
messenger: | ||
adapters: | ||
default: 'amqp://localhost/%2f/messages' | ||
customised: | ||
dsn: 'amqp://localhost/%2f/messages?exchange_name=exchange_name' | ||
options: | ||
queue_name: Queue |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -517,6 +517,7 @@ public function testWebLink() | |
public function testMessenger() | ||
{ | ||
$container = $this->createContainerFromFile('messenger'); | ||
$this->assertTrue($container->hasDefinition('message_bus')); | ||
$this->assertFalse($container->hasDefinition('messenger.middleware.doctrine_transaction')); | ||
} | ||
|
||
|
@@ -532,6 +533,33 @@ public function testMessengerValidationDisabled() | |
$this->assertFalse($container->hasDefinition('messenger.middleware.validator')); | ||
} | ||
|
||
public function testMessengerAdapter() | ||
{ | ||
$container = $this->createContainerFromFile('messenger_adapter'); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, the messenger key can be enabled even if the component is not part of the if (!class_exists(MessageBusInterface::class)) {
throw new LogicException('Messenger cannot be enabled as the Messenger component is not installed.');
} 😄 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch, will handle that in another PR 😉 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You fixed it in #26816, thank you :) |
||
$this->assertTrue($container->hasDefinition('messenger.sender.default')); | ||
$this->assertTrue($container->getDefinition('messenger.sender.default')->hasTag('messenger.sender')); | ||
$this->assertTrue($container->hasDefinition('messenger.receiver.default')); | ||
$this->assertTrue($container->getDefinition('messenger.receiver.default')->hasTag('messenger.receiver')); | ||
|
||
$this->assertTrue($container->hasDefinition('messenger.sender.customised')); | ||
$senderFactory = $container->getDefinition('messenger.sender.customised')->getFactory(); | ||
$senderArguments = $container->getDefinition('messenger.sender.customised')->getArguments(); | ||
|
||
$this->assertEquals(array(new Reference('messenger.adapter_factory'), 'createSender'), $senderFactory); | ||
$this->assertCount(2, $senderArguments); | ||
$this->assertEquals('amqp://localhost/%2f/messages?exchange_name=exchange_name', $senderArguments[0]); | ||
$this->assertEquals(array('queue_name' => 'Queue'), $senderArguments[1]); | ||
|
||
$this->assertTrue($container->hasDefinition('messenger.receiver.customised')); | ||
$receiverFactory = $container->getDefinition('messenger.receiver.customised')->getFactory(); | ||
$receiverArguments = $container->getDefinition('messenger.receiver.customised')->getArguments(); | ||
|
||
$this->assertEquals(array(new Reference('messenger.adapter_factory'), 'createReceiver'), $receiverFactory); | ||
$this->assertCount(2, $receiverArguments); | ||
$this->assertEquals('amqp://localhost/%2f/messages?exchange_name=exchange_name', $receiverArguments[0]); | ||
$this->assertEquals(array('queue_name' => 'Queue'), $receiverArguments[1]); | ||
} | ||
|
||
public function testTranslator() | ||
{ | ||
$container = $this->createContainerFromFile('full'); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger\Adapter\AmqpExt; | ||
|
||
use Symfony\Component\Messenger\Adapter\Factory\AdapterFactoryInterface; | ||
use Symfony\Component\Messenger\Transport\ReceiverInterface; | ||
use Symfony\Component\Messenger\Transport\SenderInterface; | ||
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface; | ||
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface; | ||
|
||
/** | ||
* @author Samuel Roze <samuel.roze@gmail.com> | ||
*/ | ||
class AmqpAdapterFactory implements AdapterFactoryInterface | ||
{ | ||
private $encoder; | ||
private $decoder; | ||
private $debug; | ||
|
||
public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, bool $debug) | ||
{ | ||
$this->encoder = $encoder; | ||
$this->decoder = $decoder; | ||
$this->debug = $debug; | ||
} | ||
|
||
public function createReceiver(string $dsn, array $options): ReceiverInterface | ||
{ | ||
return new AmqpReceiver($this->decoder, Connection::fromDsn($dsn, $options, $this->debug)); | ||
} | ||
|
||
public function createSender(string $dsn, array $options): SenderInterface | ||
{ | ||
return new AmqpSender($this->encoder, Connection::fromDsn($dsn, $options, $this->debug)); | ||
} | ||
|
||
public function supports(string $dsn, array $options): bool | ||
{ | ||
return 0 === strpos($dsn, 'amqp://'); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger\Adapter\AmqpExt; | ||
|
||
class AmqpFactory | ||
{ | ||
public function createConnection(array $credentials): \AMQPConnection | ||
{ | ||
return new \AMQPConnection($credentials); | ||
} | ||
|
||
public function createChannel(\AMQPConnection $connection): \AMQPChannel | ||
{ | ||
return new \AMQPChannel($connection); | ||
} | ||
|
||
public function createQueue(\AMQPChannel $channel): \AMQPQueue | ||
{ | ||
return new \AMQPQueue($channel); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think these
unless there's an alternative? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's correct. We can't add the requirement in this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good to know! I'm not going to hold up the train for that, but probably a good idea to add at some point. |
||
} | ||
|
||
public function createExchange(\AMQPChannel $channel): \AMQPExchange | ||
{ | ||
return new \AMQPExchange($channel); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see no test requiring a true AMQP connection (or it misses an
@requires extension amqp
), is that needed right now?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, we use the
\AMQP*
classes within the tests.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then they need the
@requires extension amqp
annotation :)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Added 👍