@@ -32,6 +32,7 @@ class Connection
32
32
'group ' => 'symfony ' ,
33
33
'consumer ' => 'consumer ' ,
34
34
'auto_setup ' => true ,
35
+ 'delete_after_ack ' => false ,
35
36
'stream_max_entries ' => 0 , // any value higher than 0 defines an approximate maximum number of stream entries
36
37
'dbindex ' => 0 ,
37
38
'tls ' => false ,
@@ -49,6 +50,7 @@ class Connection
49
50
private $ redeliverTimeout ;
50
51
private $ nextClaim = 0 ;
51
52
private $ claimInterval ;
53
+ private $ deleteAfterAck ;
52
54
private $ couldHavePendingMessages = true ;
53
55
54
56
public function __construct (array $ configuration , array $ connectionCredentials = [], array $ redisOptions = [], \Redis $ redis = null )
@@ -81,6 +83,7 @@ public function __construct(array $configuration, array $connectionCredentials =
81
83
$ this ->queue = $ this ->stream .'__queue ' ;
82
84
$ this ->autoSetup = $ configuration ['auto_setup ' ] ?? self ::DEFAULT_OPTIONS ['auto_setup ' ];
83
85
$ this ->maxEntries = $ configuration ['stream_max_entries ' ] ?? self ::DEFAULT_OPTIONS ['stream_max_entries ' ];
86
+ $ this ->deleteAfterAck = $ configuration ['delete_after_ack ' ] ?? self ::DEFAULT_OPTIONS ['delete_after_ack ' ];
84
87
$ this ->redeliverTimeout = ($ configuration ['redeliver_timeout ' ] ?? self ::DEFAULT_OPTIONS ['redeliver_timeout ' ]) * 1000 ;
85
88
$ this ->claimInterval = $ configuration ['claim_interval ' ] ?? self ::DEFAULT_OPTIONS ['claim_interval ' ];
86
89
}
@@ -114,6 +117,12 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
114
117
unset($ redisOptions ['stream_max_entries ' ]);
115
118
}
116
119
120
+ $ deleteAfterAck = null ;
121
+ if (\array_key_exists ('delete_after_ack ' , $ redisOptions )) {
122
+ $ deleteAfterAck = filter_var ($ redisOptions ['delete_after_ack ' ], FILTER_VALIDATE_BOOLEAN );
123
+ unset($ redisOptions ['delete_after_ack ' ]);
124
+ }
125
+
117
126
$ dbIndex = null ;
118
127
if (\array_key_exists ('dbindex ' , $ redisOptions )) {
119
128
$ dbIndex = filter_var ($ redisOptions ['dbindex ' ], FILTER_VALIDATE_INT );
@@ -144,6 +153,7 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
144
153
'consumer ' => $ redisOptions ['consumer ' ] ?? null ,
145
154
'auto_setup ' => $ autoSetup ,
146
155
'stream_max_entries ' => $ maxEntries ,
156
+ 'delete_after_ack ' => $ deleteAfterAck ,
147
157
'dbindex ' => $ dbIndex ,
148
158
'redeliver_timeout ' => $ redeliverTimeout ,
149
159
'claim_interval ' => $ claimInterval ,
@@ -314,6 +324,9 @@ public function ack(string $id): void
314
324
{
315
325
try {
316
326
$ acknowledged = $ this ->connection ->xack ($ this ->stream , $ this ->group , [$ id ]);
327
+ if ($ this ->deleteAfterAck ) {
328
+ $ acknowledged = $ this ->connection ->xdel ($ this ->stream , [$ id ]);
329
+ }
317
330
} catch (\RedisException $ e ) {
318
331
throw new TransportException ($ e ->getMessage (), 0 , $ e );
319
332
}
@@ -408,6 +421,18 @@ public function setup(): void
408
421
$ this ->connection ->clearLastError ();
409
422
}
410
423
424
+ if ($ this ->deleteAfterAck ) {
425
+ $ groups = $ this ->connection ->xinfo ('GROUPS ' , $ this ->stream );
426
+ if (
427
+ // support for Redis extension version 5+
428
+ (\is_array ($ groups ) && 1 < \count ($ groups ))
429
+ // support for Redis extension version 4.x
430
+ || (\is_string ($ groups ) && substr_count ($ groups , '"name" ' ))
431
+ ) {
432
+ throw new LogicException (sprintf ('More than one group exists for stream "%s", delete_after_ack can not be enabled as it risks deleting messages before all groups could consume them. ' , $ this ->stream ));
433
+ }
434
+ }
435
+
411
436
$ this ->autoSetup = false ;
412
437
}
413
438
0 commit comments