8000 [RedisMessengerBridge] Add a delete_after_ack option to automatically… · symfony/symfony@7c416a7 · GitHub
[go: up one dir, main page]

Skip to content

Commit 7c416a7

Browse files
committed
[RedisMessengerBridge] Add a delete_after_ack option to automatically clean up processed messages from memory
1 parent 6dc7d8b commit 7c416a7

File tree

3 files changed

+42
-0
lines changed

3 files changed

+42
-0
lines changed

src/Symfony/Component/Messenger/Bridge/Redis/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ CHANGELOG
99
* Deprecated use of invalid options
1010
* Added ability to receive of old pending messages with new `redeliver_timeout`
1111
and `claim_interval` options.
12+
* Added a `delete_after_ack` option to the DSN as an alternative to
13+
`stream_max_entries` to avoid leaking memory.

src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,21 @@ public function testMaxEntries()
307307
$connection->add('1', []);
308308
}
309309

310+
public function testDeleteAfterAck()
311+
{
312+
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();
313+
314+
$redis->expects($this->exactly(1))->method('xack')
315+
->with('queue', 'symfony', ['1'])
316+
->willReturn(1);
317+
$redis->expects($this->exactly(1))->method('xdel')
318+
->with('queue', ['1'])
319+
->willReturn(1);
320+
321+
$connection = Connection::fromDsn('redis://localhost/queue?delete_after_ack=true', [], $redis); // 1 = always
322+
$connection->ack('1');
323+
}
324+
310325
public function testLastErrorGetsCleared()
311326
{
312327
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();

src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class Connection
3232
'group' => 'symfony',
3333
'consumer' => 'consumer',
3434
'auto_setup' => true,
35+
'delete_after_ack' => false,
3536
'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries
3637
'dbindex' => 0,
3738
'tls' => false,
@@ -49,6 +50,7 @@ class Connection
4950
private $redeliverTimeout;
5051
private $nextClaim = 0;
5152
private $claimInterval;
53+
private $deleteAfterAck;
5254
private $couldHavePendingMessages = true;
5355

5456
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
@@ -81,6 +83,7 @@ public function __construct(array $configuration, array $connectionCredentials =
8183
$this->queue = $this->stream.'__queue';
8284
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
8385
$this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries'];
86+
$this->deleteAfterAck = $configuration['delete_after_ack'] ?? self::DEFAULT_OPTIONS['delete_after_ack'];
8487
$this->redeliverTimeout = ($configuration['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout']) * 1000;
8588
$this->claimInterval = $configuration['claim_interval'] ?? self::DEFAULT_OPTIONS['claim_interval'];
8689
}
@@ -114,6 +117,12 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
114117
unset($redisOptions['stream_max_entries']);
115118
}
116119

120+
$deleteAfterAck = null;
121+
if (\array_key_exists('delete_after_ack', $redisOptions)) {
122+
$deleteAfterAck = filter_var($redisOptions['delete_after_ack'], FILTER_VALIDATE_BOOLEAN);
123+
unset($redisOptions['delete_after_ack']);
124+
}
125+
117126
$dbIndex = null;
118127
if (\array_key_exists('dbindex', $redisOptions)) {
119128
$dbIndex = filter_var($redisOptions['dbindex'], FILTER_VALIDATE_INT);
@@ -144,6 +153,7 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
144153
'consumer' => $redisOptions['consumer'] ?? null,
145154
'auto_setup' => $autoSetup,
146155
'stream_max_entries' => $maxEntries,
156+
'delete_after_ack' => $deleteAfterAck,
147157
'dbindex' => $dbIndex,
148158
'redeliver_timeout' => $redeliverTimeout,
149159
'claim_interval' => $claimInterval,
@@ -314,6 +324,9 @@ public function ack(string $id): void
314324
{
315325
try {
316326
$acknowledged = $this->connection->xack($this->stream, $this->group, [$id]);
327+
if ($this->deleteAfterAck) {
328+
$acknowledged = $this->connection->xdel($this->stream, [$id]);
329+
}
317330
} catch (\RedisException $e) {
318331
throw new TransportException($e->getMessage(), 0, $e);
319332
}
@@ -408,6 +421,18 @@ public function setup(): void
408421
$this->connection->clearLastError();
409422
}
410423

424+
if ($this->deleteAfterAck) {
425+
$groups = $this->connection->xinfo('GROUPS', $this->stream);
426+
if (
427+
// support for Redis extension version 5+
428+
(\is_array($groups) && 1 < \count($groups))
429+
// support for Redis extension version 4.x
430+
|| (\is_string($groups) && substr_count($groups, '"name"'))
431+
) {
432+
throw new LogicException(sprintf('More than one group exists for stream "%s", delete_after_ack can not be enabled as it risks deleting messages before all groups could consume them.', $this->stream));
433+
}
434+
}
435+
411436
$this->autoSetup = false;
412437
}
413438

0 commit comments

Comments
 (0)
0