8000 Enhance MessengerPass to filter consumable receivers using is_consuma… · symfony/symfony@1466aab · GitHub
[go: up one dir, main page]

Skip to content

Commit 1466aab

Browse files
committed
Enhance MessengerPass to filter consumable receivers using is_consumable tag attribute
Resolves #51556
1 parent c78a9ec commit 1466aab

File tree

3 files changed

+41
-7
lines changed

3 files changed

+41
-7
lines changed

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2286,6 +2286,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
22862286
->addTag('messenger.receiver', [
22872287
'alias' => $name,
22882288
'is_failure_transport' => \in_array($name, $failureTransports),
2289+
'is_consumable' =>!str_starts_with($transport['dsn'], 'sync://'),
22892290
])
22902291
;
22912292
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);

src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ private function registerReceivers(ContainerBuilder $container, array $busIds):
274274
}
275275
}
276276

277+
$consumableReceiverNames = [];
277278
foreach ($container->findTaggedServiceIds('messenger.receiver') as $id => $tags) {
278279
$receiverClass = $this->getServiceClass($container, $id);
279280
if (!is_subclass_of($receiverClass, ReceiverInterface::class)) {
@@ -289,6 +290,9 @@ private function registerReceivers(ContainerBuilder $container, array $busIds):
289290
$failureTransportsMap[$tag['alias']] = $receiverMapping[$id];
290291
}
291292
}
293+
if (isset($tag['is_consumable']) && $tag['is_consumable']) {
294+
$consumableReceiverNames[] = $tag['alias'] ?? $id;
295+
}
292296
}
293297
}
294298

@@ -314,7 +318,7 @@ private function registerReceivers(ContainerBuilder $container, array $busIds):
314318
$consumeCommandDefinition->replaceArgument(0, new Reference('messenger.routable_message_bus'));
315319
}
316320

317-
$consumeCommandDefinition->replaceArgument(4, array_values($receiverNames));
321+
$consumeCommandDefinition->replaceArgument(4, $consumableReceiverNames);
318322
try {
319323
$consumeCommandDefinition->replaceArgument(6, $busIds);
320324
} catch (OutOfBoundsException) {

src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Bridge\PhpUnit\ExpectDeprecationTrait;
16+
use Symfony\Component\Console\Command\Command;
1617
use Symfony\Component\DependencyInjection\ChildDefinition;
1718
use Symfony\Component\DependencyInjection\Compiler\AttributeAutoconfigurationPass;
1819
use Symfony\Component\DependencyInjection\Compiler\ResolveChildDefinitionsPass;
@@ -453,7 +454,7 @@ public function testItRegistersReceivers()
453454
{
454455
$container = $this->getContainerBuilder();
455456
$container->register('message_bus', MessageBusInterface::class)->addTag('messenger.bus');
456-
$container->register(AmqpReceiver::class, AmqpReceiver::class)->addTag('messenger.receiver', ['alias' => 'amqp']);
457+
$container->register(AmqpReceiver::class, AmqpReceiver::class)->addTag('messenger.receiver', ['alias' => 'amqp', 'is_consumable' => true]);
457458

458459
(new MessengerPass())->process($container);
459460

@@ -464,7 +465,7 @@ public function testItRegistersReceiversWithoutTagName()
464465
{
465466
$container = $this->getContainerBuilder();
466467
$container->register('message_bus', MessageBusInterface::class)->addTag('messenger.bus');
467-
$container->register(AmqpReceiver::class, AmqpReceiver::class)->addTag('messenger.receiver');
468+
$container->register(AmqpReceiver::class, AmqpReceiver::class)->addTag('messenger.receiver', ['is_consumable' => true]);
468469

469470
(new MessengerPass())->process($container);
470471

@@ -482,8 +483,8 @@ public function testItRegistersMultipleReceiversAndSetsTheReceiverNamesOnTheComm
482483
null,
483484
]);
484485

485-
$container->register(AmqpReceiver::class, AmqpReceiver::class)->addTag('messenger.receiver', ['alias' => 'amqp']);
486-
$container->register(DummyReceiver::class, DummyReceiver::class)->addTag('messenger.receiver', ['alias' => 'dummy']);
486+
$container->register(AmqpReceiver::class, AmqpReceiver::class)->addTag('messenger.receiver', ['alias' => 'amqp', 'is_consumable' => true]);
487+
$container->register(DummyReceiver::class, DummyReceiver::class)->addTag('messenger.receiver', ['alias' => 'dummy', 'is_consumable' => true]);
487488

488489
(new MessengerPass())->process($container);
489490

@@ -498,14 +499,42 @@ public function testItSetsTheReceiverNamesOnTheSetupTransportsCommand()
498499
null,
499500
]);
500501

501-
$container->register(AmqpReceiver::class, AmqpReceiver::class)->addTag('messenger.receiver', ['alias' => 'amqp']);
502-
$container->register(DummyReceiver::class, DummyReceiver::class)->addTag('messenger.receiver', ['alias' => 'dummy']);
502+
$container->register(AmqpReceiver::class, AmqpReceiver::class)->addTag('messenger.receiver', ['alias' => 'amqp', 'is_consumable' => true]);
503+
$container->register(DummyReceiver::class, DummyReceiver::class)->addTag('messenger.receiver', ['alias' => 'dummy', 'is_consumable' => true]);
503504

504505
(new MessengerPass())->process($container);
505506

506507
$this->assertSame(['amqp', 'dummy'], $container->getDefinition('console.command.messenger_setup_transports')->getArgument(1));
507508
}
508509

510+
public function testOnlyConsumableTransportsAreAddedToConsumeCommand()
511+
{
512+
$container = new ContainerBuilder();
513+
514+
$container->register('messenger.transport.async', DummyReceiver::class)
515+
->addTag('messenger.receiver', ['alias' => 'async', 'is_consumable' => true]);
516+
$container->register('messenger.transport.sync', DummyReceiver::class)
517+
->addTag('messenger.receiver', ['alias' => 'sync', 'is_consumable' => false]);
518+
$container->register('messenger.receiver_locator', ServiceLocator::class)
519+
->setArguments([[]]);
520+
521+
$container->register('console.command.messenger_consume_messages', Command::class)
522+
->setArguments([
523+
null,
524+
null,
525+
null,
526+
null,
527+
[],
528+
]);
529+
530+
(new MessengerPass())->process($conta 7D0B iner);
531+
532+
$this->assertSame(
533+
['async'],
534+
$container->getDefinition('console.command.messenger_consume_messages')->getArgument(4)
535+
);
536+
}
537+
509538
/**
510539
* @group legacy
511540
*/

0 commit comments

Comments
 (0)
0