8000 feature #59862 [Messenger] Allow to close the transport connection (a… · symfony/symfony@89bae9c · GitHub
[go: up one dir, main page]

Skip to content

Commit 89bae9c

Browse files
committed
feature #59862 [Messenger] Allow to close the transport connection (andrew-demb)
This PR was squashed before being merged into the 7.3 branch. Discussion ---------- [Messenger] Allow to close the transport connection | Q | A | ------------- | --- | Branch? | 7.3 | Bug fix? | no | New feature? | yes | Deprecations? | no | Issues | Fix #53543 | License | MIT <!-- Replace this notice by a description of your feature/bugfix. This will help reviewers and should be a good start for the documentation. Additionally (see https://symfony.com/releases): - Always add tests and ensure they pass. - Bug fixes must be submitted against the lowest maintained branch where they apply (lowest branches are regularly merged to upper ones so they get the fixes too). - Features and deprecations must be submitted against the latest branch. - For new features, provide some code snippets to help understand usage. - Changelog entry should follow https://symfony.com/doc/current/contributing/code/conventions.html#writing-a-changelog-entry - Never break backward compatibility (see https://symfony.com/bc). --> ~~1. Implemented the possibility to make messenger transports resettable~~ ~~2. Implemented reset Redis connection for Redis messenger transport~~ ~~This feature may lead to decreased performance for messenger consumers (because connection will be resetted between processing messages).~~ ~~One way to resolve it that I found - add configuration for resettable transports - aka "reset connection on kernel reset: true/false". For consumers it can be configured via env var to be "false", but for web "true".~~ ---- **UPD 2025-02-27**: According to the feedback, I changed the implementation to another one to help with our use case. Implemented a way to close messenger transport from the application, to allow free resources for long-running processes (like a long-running webserver) Commits ------- 9788aee [Messenger] Allow to close the transport connection
2 parents 6477041 + 9788aee commit 89bae9c

File tree

13 files changed

+57
-12
lines changed

13 files changed

+57
-12
lines changed

src/Symfony/Component/Messenger/Bridge/AmazonSqs/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ CHANGELOG
44
7.3
55
---
66

7+
* Implement the `CloseableTransportInterface` to allow closing the transport
78
* Add new `queue_attributes` and `queue_tags` options for SQS queue creation
89

910
7.2

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransport.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use AsyncAws\Core\Exception\Http\HttpException;
1515
use Symfony\Component\Messenger\Envelope;
1616
use Symfony\Component\Messenger\Exception\TransportException;
17+
use Symfony\Component\Messenger\Transport\CloseableTransportInterface;
1718
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1819
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1920
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
@@ -27,7 +28,7 @@
2728
/**
2829
* @author Jérémy Derussé <jeremy@derusse.com>
2930
*/
30-
class AmazonSqsTransport implements TransportInterface, KeepaliveReceiverInterface, SetupableTransportInterface, MessageCountAwareInterface, ResetInterface
31+
class AmazonSqsTransport implements TransportInterface, KeepaliveReceiverInterface, SetupableTransportInterface, CloseableTransportInterface, MessageCountAwareInterface, ResetInterface
3132
{
3233
private SerializerInterface $serializer;
3334

@@ -91,6 +92,11 @@ public function reset(): void
9192
}
9293
}
9394

95+
public function close(): void
96+
{
97+
$this->reset();
98+
}
99+
94100
private function getReceiver(): MessageCountAwareInterface&ReceiverInterface
95101
{
96102
return $this->receiver ??= new AmazonSqsReceiver($this->connection, $this->serializer);

src/Symfony/Component/Messenger/Bridge/AmazonSqs/composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
"php": ">=8.2",
2020
"async-aws/core": "^1.7",
2121
"async-aws/sqs": "^1.0|^2.0",
22-
"symfony/messenger": "^7 A3E2 .2",
22+
"symfony/messenger": "^7.3",
2323
"symfony/service-contracts": "^2.5|^3",
2424
"psr/log": "^1|^2|^3"
2525
},

src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ CHANGELOG
44
7.1
55
---
66

7+
* Implement the `CloseableTransportInterface` to allow closing the AMQP connection
78
* Add option `delay[arguments]` in the transport definition
89

910
6.0

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpTransport.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
1313

1414
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Transport\CloseableTransportInterface;
1516
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1617
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
1718
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@@ -22,7 +23,7 @@
2223
/**
2324
* @author Nicolas Grekas <p@tchwork.com>
2425
*/
25-
class AmqpTransport implements QueueReceiverInterface, TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
26+
class AmqpTransport implements QueueReceiverInterface, TransportInterface, SetupableTransportInterface, CloseableTransportInterface, MessageCountAwareInterface
2627
{
2728
private SerializerInterface $serializer;
2829
private AmqpReceiver $receiver;
@@ -70,6 +71,11 @@ public function getMessageCount(): int
7071
return $this->getReceiver()->getMessageCount();
7172
}
7273

74+
public function close(): void
75+
{
76+
$this->connection->clear();
77+
}
78+
7379
private function getReceiver(): AmqpReceiver
7480
{
7581
return $this->receiver ??= new AmqpReceiver($this->connection, $this->serializer);

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,7 @@ private function clearWhenDisconnected(): void
551551
}
552552
}
553553

554-
private function clear(): void
554+
public function clear(): void
555555
{
556556
unset($this->amqpChannel, $this->amqpExchange, $this->amqpDelayExchange);
557557
$this->amqpQueues = [];

src/Symfony/Component/Messenger/Bridge/Amqp/composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
"require": {
1919
"php": ">=8.2",
2020
"ext-amqp": "*",
21-
"symfony/messenger": "^6.4|^7.0"
21+
"symfony/messenger": "^7.3"
2222
},
2323
"require-dev": {
2424
"symfony/event-dispatcher": "^6.4|^7.0",

src/Symfony/Component/Messenger/Bridge/Redis/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ CHANGELOG
44
7.3
55
---
66

7+
* Implement the `CloseableTransportInterface` to allow closing the Redis connection
78
* Implement the `KeepaliveReceiverInterface` to enable asynchronously notifying Redis that the job is still being processed, in order to avoid timeouts
89

910
6.3

src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ class Connection
5555
'ssl' => null, // see https://php.net/context.ssl
5656
];
5757

58-
private \Redis|Relay|\RedisCluster|\Closure $redis;
58+
private \Redis|Relay|\RedisCluster|null $redis = null;
59+
private \Closure $redisInitializer;
5960
private string $stream;
6061
private string $queue;
6162
private string $group;
@@ -112,9 +113,9 @@ public function __construct(array $options, \Redis|Relay|\RedisCluster|null $red
112113

113114
if ((\is_array($host) && null === $sentinelMaster) || $redis instanceof \RedisCluster) {
114115
$hosts = \is_string($host) ? [$host.':'.$port] : $host; // Always ensure we have an array
115-
$this->redis = static fn () => self::initializeRedisCluster($redis, $hosts, $auth, $options);
116+
$this->redisInitializer = static fn () => self::initializeRedisCluster($redis, $hosts, $auth, $options);
116117
} else {
117-
$this->redis = static function () use ($redis, $sentinelMaster, $host, $port, $options, $auth) {
118+
$this->redisInitializer = static function () use ($redis, $sentinelMaster, $host, $port, $options, $auth) {
118119
if (null !== $sentinelMaster) {
119120
$sentinelClass = \extension_loaded('redis') ? \RedisSentinel::class : Sentinel::class;
120121
$hostIndex = 0;
@@ -737,10 +738,15 @@ private function rawCommand(string $command, ...$arguments): mixed
737738

738739
private function getRedis(): \Redis|Relay|\RedisCluster
739740
{
740-
if ($this->redis instanceof \Closure) {
741-
$this->redis = ($this->redis)();
741+
if (!$this->redis) {
742+
$this->redis = ($this->redisInitializer)();
742743
}
743744

744745
return $this->redis;
745746
}
747+
748+
public function close(): void
749+
{
750+
$this->redis = null;
751+
}
746752
}

src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisTransport.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Bridge\Redis\Transport;
1313

1414
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Transport\CloseableTransportInterface;
1516
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1617
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1718
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@@ -23,7 +24,7 @@
2324
* @author Alexander Schranz <alexander@sulu.io>
2425
* @author Antoine Bluchet <soyuka@gmail.com>
2526
*/
26-
class RedisTransport implements TransportInterface, KeepaliveReceiverInterface, SetupableTransportInterface, MessageCountAwareInterface
27+
class RedisTransport implements TransportInterface, KeepaliveReceiverInterface, SetupableTransportInterface, CloseableTransportInterface, MessageCountAwareInterface
2728
{
2829
private SerializerInterface $serializer;
2930
private RedisReceiver $receiver;
@@ -71,6 +72,11 @@ public function getMessageCount(): int
7172
return $this->getReceiver()->getMessageCount();
7273
}
7374

75+
public function close(): void
76+
{
77+
$this->connection->close();
78+
}
79+
7480
private function getReceiver(): RedisReceiver
7581
{
7682
return $this->receiver ??= new RedisReceiver($this->connection, $this->serializer);

src/Symfony/Component/Messenger/Bridge/Redis/composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
"require": {
1919
"php": ">=8.2",
2020
"ext-redis": "*",
21-
"symfony/messenger": "^7.2"
21+
"symfony/messenger": "^7.3"
2222
},
2323
"require-dev": {
2424
"symfony/property-access": "^6.4|^7.0",

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ CHANGELOG
44
7.3
55
---
66

7+
* Add `CloseableTransportInterface` to allow closing the transport
78
* Add `SentForRetryStamp` that identifies whether a failed message was sent for retry
89
* Add `Symfony\Component\Messenger\Middleware\DeduplicateMiddleware` and `Symfony\Component\Messenger\Stamp\DeduplicateStamp`
910

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport;
13+
14+
interface CloseableTransportInterface
15+
{
16+
public function close(): void;
17+
}

0 commit comments

Comments
 (0)
0