8000 [Mesenger] Add support for reseting container services between 2 mess… · symfony/symfony@ae3dc21 · GitHub
[go: up one dir, main page]

Skip to content

Commit ae3dc21

Browse files
committed
[Mesenger] Add support for reseting container services between 2 messages
Without this patch, services are not resetted. For example Monolog Finger Cross handler is never reset nor flushed. So if the first message trigger and "error" level message, all others message will log and overflow the buffer. Usage with framework: ```yaml framework: messenger: transports: async: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' reset_on_message: true failed: 'doctrine://default?queue_name=failed' sync: 'sync://' ```
1 parent c151f76 commit ae3dc21

File tree

13 files changed

+129
-3
lines changed

13 files changed

+129
-3
lines changed

src/Symfony/Bundle/FrameworkBundle/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
CHANGELOG
22
=========
33

4+
5.4
5+
---
6+
* Add support for resting container services after each messenger messages.
7+
48
5.3
59
---
610

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1325,6 +1325,7 @@ function ($a) {
13251325
->fixXmlConfig('option')
13261326
->children()
13271327
->scalarNode('dsn')->end()
1328+
->booleanNode('reset_on_message')->defaultFalse()->info('Reset the container services after each message.')->end()
13281329
->scalarNode('serializer')->defaultNull()->info('Service id of a custom serializer to use.')->end()
13291330
->arrayNode('options')
13301331
->normalizeKeys(false)

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1983,6 +1983,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
19831983

19841984
$senderAliases = [];
19851985
$transportRetryReferences = [];
1986+
$transportNamesForResetServices = [];
19861987
foreach ($config['transports'] as $name => $transport) {
19871988
$serializerId = $transport['serializer'] ?? 'messenger.default_serializer';
19881989
$transportDefinition = (new Definition(TransportInterface::class))
@@ -2011,6 +2012,18 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
20112012

20122013
$transportRetryReferences[$name] = new Reference($retryServiceId);
20132014
}
2015+
if ($transport['reset_on_message']) {
2016+
$transportNamesForResetServices[] = $name;
2017+
}
2018+
}
2019+
2020+
if ($transportNamesForResetServices) {
2021+
$container
2022+
->getDefinition('messenger.listener.reset_services')
2023+
->replaceArgument(1, $transportNamesForResetServices)
2024+
;
2025+
} else {
2026+
$container->removeDefinition('messenger.listener.reset_services');
20142027
}
20152028

20162029
$senderReferences = [];

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory;
1919
use Symfony\Component\Messenger\EventListener\AddErrorDetailsStampListener;
2020
use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener;
21+
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
2122
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
2223
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
2324
use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener;
@@ -193,6 +194,13 @@
193194
->set('messenger.listener.stop_worker_on_sigterm_signal_listener', StopWorkerOnSigtermSignalListener::class)
194195
->tag('kernel.event_subscriber')
195196

197+
->set('messenger.listener.reset_services', ResetServicesListener::class)
198+
->args([
199+
service('services_resetter'),
200+
abstract_arg('receivers names'),
201+
])
202+
->tag('kernel.event_subscriber')
203+
196204
->set('messenger.routable_message_bus', RoutableMessageBus::class)
197205
->args([
198206
abstract_arg('message bus locator'),

src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,7 @@
503503
<xsd:attribute name="serializer" type="xsd:string" />
504504
<xsd:attribute name="dsn" type="xsd:string" />
505505
<xsd:attribute name="failure-transport" type="xsd:string" />
506+
<xsd:attribute name="reset-on-message" type="xsd:boolean" />
506507
</xsd:complexType>
507508

508509
<xsd:complexType name="messenger_retry_strategy">

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
'default' => 'amqp://localhost/%2f/messages',
1212
'customised' => [
1313
'dsn' => 'amqp://localhost/%2f/messages?exchange_name=exchange_name',
14+
'reset_on_message' => true,
1415
'options' => ['queue' => ['name' => 'Queue']],
1516
'serializer' => 'messenger.transport.native_php_serializer',
1617
'retry_strategy' => [

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<framework:messenger failure-transport="failed">
1111
<framework:serializer default-serializer="messenger.transport.symfony_serializer" />
1212
<framework:transport name="default" dsn="amqp://localhost/%2f/messages" />
13-
<framework:transport name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name" serializer="messenger.transport.native_php_serializer">
13+
<framework:transport name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name" serializer="messenger.transport.native_php_serializer" reset-on-message="true">
1414
<framework:options>
1515
<framework:queue>
1616
<framework:name>Queue</framework:name>

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ framework:
88
default: 'amqp://localhost/%2f/messages'
99
customised:
1010
dsn: 'amqp://localhost/%2f/messages?exchange_name=exchange_name'
11+
reset_on_message: true
1112
options:
1213
queue:
1314
name: Queue

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,7 @@ public function testMessenger()
714714
$this->assertTrue($container->hasDefinition('messenger.transport.redis.factory'));
715715
$this->assertTrue($container->hasDefinition('messenger.transport_factory'));
716716
$this->assertSame(TransportFactory::class, $container->getDefinition('messenger.transport_factory')->getClass());
717+
$this->assertFalse($container->hasDefinition('messenger.listener.reset_services'));
717718
}
718719

719720
public function testMessengerMultipleFailureTransports()
@@ -858,6 +859,9 @@ public function testMessengerTransports()
858859
return array_shift($values);
859860
}, $failureTransports);
860861
$this->assertEquals($expectedTransportsByFailureTransports, $failureTransportsReferences);
862+
863+
$this->assertTrue($container->hasDefinition('messenger.listener.reset_services'));
864+
$this->assertSame(['customised'], $container->getDefinition('messenger.listener.reset_services')->getArgument(1));
861865
}
862866

863867
public function testMessengerRouting()

src/Symfony/Bundle/FrameworkBundle/composer.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
"symfony/http-client": "^4.4|^5.0",
4949
"symfony/lock": "^4.4|^5.0",
5050
"symfony/mailer": "^5.2",
51-
"symfony/messenger": "^5.2",
51+
"symfony/messenger": "^5.4",
5252
"symfony/mime": "^4.4|^5.0",
5353
"symfony/notifier": "^5.3",
5454
"symfony/allmysms-notifier": "^5.3",
@@ -115,7 +115,7 @@
115115
"symfony/form": "<5.2",
116116
"symfony/lock": "<4.4",
117117
"symfony/mailer": "<5.2",
118-
"symfony/messenger": "<4.4",
118+
"symfony/messenger": "<5.4",
119119
"symfony/mime": "<4.4",
120120
"symfony/property-info": "<4.4",
121121
"symfony/property-access": "<5.3",

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
CHANGELOG
22
=========
33

4+
5.4
5+
---
6+
* Add support for resting container services after each messages.
7+
48
5.3
59
---
610

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\EventListener;
13+
14+
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
15+
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
16+
use Symfony\Component\Messenger\Event\AbstractWorkerMessageEvent;
17+
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
18+
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
19+
20+
/**
21+
* @author Grégoire Pineau <lyrixx@lyrixx.info>
22+
*/
23+
class ResetServicesListener implements EventSubscriberInterface
24+
{
25+
private $servicesResetter;
26+
private $receiversName;
27+
28+
public function __construct(ServicesResetter $servicesResetter, array $receiversName)
29+
{
30+
$this->servicesResetter = $servicesResetter;
31+
$this->receiversName = $receiversName;
32+
}
33+
34+
public function resetServices(AbstractWorkerMessageEvent $event)
35+
{
36+
if (!\in_array($event->getReceiverName(), $this->receiversName, true)) {
37+
return;
38+
}
39+
40+
$this->servicesResetter->reset();
41+
}
42+
43+
public static function getSubscribedEvents()
44+
{
45+
return [
46+
WorkerMessageHandledEvent::class => ['resetServices'],
47+
WorkerMessageFailedEvent::class => ['resetServices'],
48+
];
49+
}
50+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\EventListener;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
16+
use Symfony\Component\Messenger\Envelope;
17+
use Symfony\Component\Messenger\Event\AbstractWorkerMessageEvent;
18+
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
19+
20+
class ResetServicesListenerTest extends TestCase
21+
{
22+
public function provideTests(): iterable
23+
{
24+
yield ['foo', true];
25+
yield ['bar', false];
26+
}
27+
28+
/** @dataProvider provideTests */
29+
public function test(string $receiverName, bool $shouldReset)
30+
{
31+
$servicesResetter = $this->createMock(ServicesResetter::class);
32+
$servicesResetter->expects($shouldReset ? $this->once() : $this->never())->method('reset');
33+
34+
$event = new class(new Envelope(new \stdClass()), $receiverName) extends AbstractWorkerMessageEvent {};
35+
36+
$memoryLimitListener = new ResetServicesListener($servicesResetter, ['foo']);
37+
$memoryLimitListener->resetServices($event);
38+
}
39+
}

0 commit comments

Comments
 (0)
0