8000 [Messenger] Move container resetting after receiver acknowledging (in… · symfony/symfony@987baef · GitHub
[go: up one dir, main page]

Skip to content

Commit 987baef

Browse files
committed
[Messenger] Move container resetting after receiver acknowledging (in command)
1 parent 8bea384 commit 987baef

File tree

13 files changed

+141
-65
lines changed

13 files changed

+141
-65
lines changed

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configurati 9E81 on.php

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1333,10 +1333,6 @@ function ($a) {
13331333
->fixXmlConfig('option')
13341334
->children()
13351335
->scalarNode('dsn')->end()
1336-
->booleanNode('reset_on_message')
1337-
->defaultFalse()
1338-
->info('Reset container services after each message. Turn it on when the transport is async and run in a worker.')
1339-
->end()
13401336
->scalarNode('serializer')->defaultNull()->info('Service id of a custom serializer to use.')->end()
13411337
->arrayNode('options')
13421338
->normalizeKeys(false)

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1977,7 +1977,6 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
19771977

19781978
$senderAliases = [];
19791979
$transportRetryReferences = [];
1980-
$transportNamesForResetServices = [];
19811980
foreach ($config['transports'] as $name => $transport) {
19821981
$serializerId = $transport['serializer'] ?? 'messenger.default_serializer';
19831982
$transportDefinition = (new Definition(TransportInterface::class))
@@ -2006,18 +2005,6 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
20062005

20072006
$transportRetryReferences[$name] = new Reference($retryServiceId);
20082007
}
2009-
if ($transport['reset_on_message']) {
2010-
$transportNamesForResetServices[] = $name;
2011-
}
2012-
}
2013-
2014-
if ($transportNamesForResetServices) {
2015-
$container
2016-
->getDefinition('messenger.listener.reset_services')
2017-
->replaceArgument(1, $transportNamesForResetServices)
2018-
;
2019-
} else {
2020-
$container->removeDefinition('messenger.listener.reset_services');
20212008
}
20222009

20232010
$senderReferences = [];

src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@
142142
service('event_dispatcher'),
143143
service('logger')->nullOnInvalid(),
144144
[], // Receiver names
145+
service('services_resetter')
145146
])
146147
->tag('console.command')
147148
->tag('monolog.logger', ['channel' => 'messenger'])

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory;
1919
use Symfony\Component\Messenger\EventListener\AddErrorDetailsStampListener;
2020
use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener;
21-
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
2221
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
2322
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
2423
use Symfony\Component\Messenger\EventListener\StopWorkerOnCustomStopExceptionListener;
@@ -198,13 +197,6 @@
198197
->set('messenger.listener.stop_worker_on_stop_exception_listener', StopWorkerOnCustomStopExceptionListener::class)
199198
->tag('kernel.event_subscriber')
200199

201-
->set('messenger.listener.reset_services', ResetServicesListener::class)
202-
->args([
203-
service('services_resetter'),
204-
abstract_arg('receivers names'),
205-
])
206-
->tag('kernel.event_subscriber')
207-
208200
->set('messenger.routable_message_bus', RoutableMessageBus::class)
209201
->args([
210202
abstract_arg('message bus locator'),

src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,6 @@
505505
<xsd:attribute name="serializer" type="xsd:string" />
506506
<xsd:attribute name="dsn" type="xsd:string" />
507507
<xsd:attribute name="failure-transport" type="xsd:string" />
508-
<xsd:attribute name="reset-on-message" type="xsd:boolean" />
509508
</xsd:complexType>
510509

511510
<xsd:complexType name="messenger_retry_strategy">

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
'default' => 'amqp://localhost/%2f/messages',
1212
'customised' => [
1313
'dsn' => 'amqp://localhost/%2f/messages?exchange_name=exchange_name',
14-
'reset_on_message' => true,
1514
'options' => ['queue' => ['name' => 'Queue']],
1615
'serializer' => 'messenger.transport.native_php_serializer',
1716
'retry_strategy' => [

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<framework:messenger failure-transport="failed">
1111
<framework:serializer default-serializer="messenger.transport.symfony_serializer" />
1212
<framework:transport name="default" dsn="amqp://localhost/%2f/messages" />
13-
<framework:transport name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name" serializer="messenger.transport.native_php_serializer" reset-on-message="true">
13+
<framework:transport name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name" serializer="messenger.transport.native_php_serializer">
1414
<framework:options>
1515
<framework:queue>
1616
<framework:name>Queue</framework:name>

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ framework:
88
default: 'amqp://localhost/%2f/messages'
99
customised:
1010
dsn: 'amqp://localhost/%2f/messages?exchange_name=exchange_name'
11-
reset_on_message: true
1211
options:
1312
queue:
1413
name: Queue

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -722,7 +722,6 @@ public function testMessenger()
722722
$this->assertTrue($container->hasDefinition('messenger.transport.redis.factory'));
723723
$this->assertTrue($container->hasDefinition('messenger.transport_factory'));
724724
$this->assertSame(TransportFactory::class, $container->getDefinition('messenger.transport_factory')->getClass());
725-
$this->assertFalse($container->hasDefinition('messenger.listener.reset_services'));
726725
}
727726

728727
public function testMessengerMultipleFailureTransports()
@@ -867,9 +866,6 @@ public function testMessengerTransports()
867866
return array_shift($values);
868867
}, $failureTransports);
869868
$this->assertEquals($expectedTransportsByFailureTransports, $failureTransportsReferences);
870-
871-
$this->assertTrue($container->hasDefinition('messenger.listener.reset_services'));
872-
$this->assertSame(['customised'], $container->getDefinition('messenger.listener.reset_services')->getArgument(1));
873869
}
874870

875871
public function testMessengerRouting()

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
use Symfony\Component\Console\Question\ChoiceQuestion;
2424
use Symfony\Component\Console\Style\SymfonyStyle;
2525
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
26+
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
27+
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
2628
use Symfony\Component\Messenger\EventListener\StopWorkerOnFailureLimitListener;
2729
use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener;
2830
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
@@ -43,14 +45,22 @@ class ConsumeMessagesCommand extends Command
4345
private $logger;
4446
private $receiverNames;
4547
private $eventDispatcher;
46-
47-
public function __construct(RoutableMessageBus $routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, array $receiverNames = [])
48-
{
48+
private $servicesResetter;
49+
50+
public function __construct(
51+
RoutableMessageBus $routableBus,
52+
ContainerInterface $receiverLocator,
53+
EventDispatcherInterface $eventDispatcher,
54+
LoggerInterface $logger = null,
55+
array $receiverNames = [],
56+
$servicesResetter = null
57+
) {
4958
$this->routableBus = $routableBus;
5059
$this->receiverLocator = $receiverLocator;
5160
$this->logger = $logger;
5261
$this->receiverNames = $receiverNames;
5362
$this->eventDispatcher = $eventDispatcher;
63+
$this->servicesResetter = $servicesResetter;
5464

5565
parent::__construct();
5666
}
@@ -72,6 +82,7 @@ protected function configure(): void
7282
new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
7383
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED CA89 , 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically)'),
7484
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
85+
new InputOption('reset-services-on-message', null, InputOption::VALUE_NEGATABLE, 'Reset (or do not --no-reset-services-on-message) container services after each message', false),
7586
])
7687
->setDescription(self::$defaultDescription)
7788
->setHelp(<<<'EOF'
@@ -159,6 +170,18 @@ protected function execute(InputInterface $input, OutputInterface $output)
159170
$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
160171
}
161172

173+
if (!$input->hasParameterOption(['--reset-services-on-message', '--no-reset-services-on-message'], true)) {
174+
trigger_deprecation('symfony/messenger', '5.4', 'Not setting either "--reset-services-on-message" nor "--no-reset-services-on-message" option explicitly is deprecated, its default value will change to true in 6.0.');
175+
}
176+
177+
if ($input->getOption('reset-services-on-message')) {
178+
if (null === $this->servicesResetter) {
179+
throw new RuntimeException(\sprintf('Please set a $servicesResetter with %s instance to use resetting services after each message.', ServicesResetter::class));
180+
}
181+
182+
$this->eventDispatcher->addSubscriber(new ResetServicesListener($this->servicesResetter));
183+
}
184+
162185
$stopsWhen = [];
163186
if ($limit = $input->getOption('limit')) {
164187
$stopsWhen[] = "processed {$limit} messages";

0 commit comments

Comments
 (0)
0