8000 :sparkles: [53543] Allow to close the messenger transport · symfony/symfony@619bffb · GitHub
[go: up one dir, main page]

Skip to content

Commit 619bffb

Browse files
committed
✨ [53543] Allow to close the messenger transport
1 parent 5ce08a2 commit 619bffb

File tree

8 files changed

+59
-10
lines changed

8 files changed

+59
-10
lines changed

src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransport.php

+7-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use AsyncAws\Core\Exception\Http\HttpException;
1515
use Symfony\Component\Messenger\Envelope;
1616
use Symfony\Component\Messenger\Exception\TransportException;
17+
use Symfony\Component\Messenger\Transport\CloseableTransportInterface;
1718
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1819
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1920
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
@@ -27,7 +28,7 @@
2728
/**
2829
* @author Jérémy Derussé <jeremy@derusse.com>
2930
*/
30-
class AmazonSqsTransport implements TransportInterface, KeepaliveReceiverInterface, SetupableTransportInterface, MessageCountAwareInterface, ResetInterface
31+
class AmazonSqsTransport implements TransportInterface, KeepaliveReceiverInterface, SetupableTransportInterface, CloseableTransportInterface, MessageCountAwareInterface, ResetInterface
3132
{
3233
private SerializerInterface $serializer;
3334

@@ -91,6 +92,11 @@ public function reset(): void
9192
}
9293
}
9394

95+
public function close(): void
96+
{
97+
$this->reset();
98+
}
99+
94100
private function getReceiver(): MessageCountAwareInterface&ReceiverInterface
95101
{
96102
return $this->receiver ??= new AmazonSqsReceiver($this->connection, $this->serializer);

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpTransport.php

+7-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
1313

1414
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Mes 8000 senger\Transport\CloseableTransportInterface;
1516
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1617
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
1718
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@@ -22,7 +23,7 @@
2223
/**
2324
* @author Nicolas Grekas <p@tchwork.com>
2425
*/
25-
class AmqpTransport implements QueueReceiverInterface, TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
26+
class AmqpTransport implements QueueReceiverInterface, TransportInterface, SetupableTransportInterface, CloseableTransportInterface, MessageCountAwareInterface
2627
{
2728
private SerializerInterface $serializer;
2829
private AmqpReceiver $receiver;
@@ -70,6 +71,11 @@ public function getMessageCount(): int
7071
return $this->getReceiver()->getMessageCount();
7172
}
7273

74+
public function close(): void
75+
{
76+
$this->connection->clear();
77+
}
78+
7379
private function getReceiver(): AmqpReceiver
7480
{
7581
return $this->receiver ??= new AmqpReceiver($this->connection, $this->serializer);

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,7 @@ private function clearWhenDisconnected(): void
551551
}
552552
}
553553

554-
private function clear(): void
554+
public function clear(): void
555555
{
556556
unset($this->amqpChannel, $this->amqpExchange, $this->amqpDelayExchange);
557557
$this->amqpQueues = [];

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php

+6
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,12 @@ public function reset(): void
8080
$this->doMysqlCleanup = false;
8181
}
8282

83+
public function close(): void
84+
{
85+
$this->driverConnection->close();
86+
$this->reset();
87+
}
88+
8389
public function getConfiguration(): array
8490
{
8591
return $this->configuration;

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineTransport.php

+7-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Doctrine\DBAL\Schema\Schema;
1616
use Doctrine\DBAL\Schema\Table;
1717
use Symfony\Component\Messenger\Envelope;
18+
use Symfony\Component\Messenger\Transport\CloseableTransportInterface;
1819
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1920
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
2021
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
@@ -25,7 +26,7 @@
2526
/**
2627
* @author Vincent Touzet <vincent.touzet@gmail.com>
2728
*/
28-
class DoctrineTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface, ListableReceiverInterface, KeepaliveReceiverInterface
29+
class DoctrineTransport implements TransportInterface, SetupableTransportInterface, CloseableTransportInterface, MessageCountAwareInterface, ListableReceiverInterface, KeepaliveReceiverInterface
2930
{
3031
private DoctrineReceiver $receiver;
3132
private DoctrineSender $sender;
@@ -81,6 +82,11 @@ public function setup(): void
8182
$this->connection->setup();
8283
}
8384

85+
public function close(): void
86+
{
87+
$this->connection->close();
88+
}
89+
8490
/**
8591
* Adds the Table to the Schema if this transport uses this connection.
8692
*/

src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php

+11-5
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ class Connection
5555
'ssl' => null, // see https://php.net/context.ssl
5656
];
5757

58-
private \Redis|Relay|\RedisCluster|\Closure $redis;
58+
private \Redis|Relay|\RedisCluster|null $redis = null;
59+
private \Closure $redisInitializer;
5960
private string $stream;
6061
private string $queue;
6162
private string $group;
@@ -112,9 +113,9 @@ public function __construct(array $options, \Redis|Relay|\RedisCluster|null $red
112113

113114
if ((\is_array($host) && null === $sentinelMaster) || $redis instanceof \RedisCluster) {
114115
$hosts = \is_string($host) ? [$host.':'.$port] : $host; // Always ensure we have an array
115-
$this->redis = static fn () => self::initializeRedisCluster($redis, $hosts, $auth, $options);
116+
$this->redisInitializer = static fn () => self::initializeRedisCluster($redis, $hosts, $auth, $options);
116117
} else {
117-
$this->redis = static function () use ($redis, $sentinelMaster, $host, $port, $options, $auth) {
118+
$this->redisInitializer = static function () use ($redis, $sentinelMaster, $host, $port, $options, $auth) {
118119
if (null !== $sentinelMaster) {
119120
$sentinelClass = \extension_loaded('redis') ? \RedisSentinel::class : Sentinel::class;
120121
$hostIndex = 0;
@@ -737,10 +738,15 @@ private function rawCommand(string $command, ...$arguments): mixed
737738

738739
private function getRedis(): \Redis|Relay|\RedisCluster
739740
{
740-
if ($this->redis instanceof \Closure) {
741-
$this->redis = ($this->redis)();
741+
if (!$this->redis) {
742+
$this->redis = ($this->redisInitializer)();
742743
}
743744

744745
return $this->redis;
745746
}
747+
748+
public function close(): void
749+
{
750+
$this->redis = null;
751+
}
746752
}

src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisTransport.php

+7-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Bridge\Redis\Transport;
1313

1414
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Transport\CloseableTransportInterface;
1516
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1617
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1718
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@@ -23,7 +24,7 @@
2324
* @author Alexander Schranz <alexander@sulu.io>
2425
* @author Antoine Bluchet <soyuka@gmail.com>
2526
*/
26-
class RedisTransport implements TransportInterface, KeepaliveReceiverInterface, SetupableTransportInterface, MessageCountAwareInterface
27+
class RedisTransport implements TransportInterface, KeepaliveReceiverInterface, SetupableTransportInterface, CloseableTransportInterface, MessageCountAwareInterface
2728
{
2829
private SerializerInterface $serializer;
2930
private RedisReceiver $receiver;
@@ -71,6 +72,11 @@ public function getMessageCount(): int
7172
return $this->getReceiver()->getMessageCount();
7273
}
7374

75+
public function close(): void
76+
{
77+
$this->connection->close();
78+
}
79+
7480
private function getReceiver(): RedisReceiver
7581
{
7682
return $this->receiver ??= new RedisReceiver($this->connection, $this->serializer);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Symfony\Component\Messenger\Transport;
6+
7+
interface CloseableTransportInterface
8+
{
9+
/**
10+
* Close the transport.
11+
*/
12+
public function close(): void;
13+
}

0 commit comments

Comments
 (0)
0