8000 Enhancement: Add Keepalive Support to Doctrine Messenger Transport · symfony/symfony@e5fcaad · GitHub
[go: up one dir, main page]

Skip to content

Commit e5fcaad

Browse files
committed
Enhancement: Add Keepalive Support to Doctrine Messenger Transport
1 parent 7b0cdc8 commit e5fcaad

File tree

4 files changed

+46
-2
lines changed

4 files changed

+46
-2
lines changed

src/Symfony/Component/Messenger/Bridge/Doctrine/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
CHANGELOG
22
=========
33

4+
7.3
5+
---
6+
* Add "keepalive" support
7+
48
7.1
59
---
610

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,34 @@ public function reject(string $id): bool
286286
}
287287
}
288288

289+
public function keepalive(string $id, ?int $seconds = null): void
290+
{
291+
// Check if the redeliver timeout is smaller than the keepalive interval
292+
if (null !== $seconds && $this->configuration['redeliver_timeout'] < $seconds) {
293+
throw new TransportException(\sprintf('Doctrine redeliver_timeout (%ds) cannot be smaller than the keepalive interval (%ds).', $this->configuration['redeliver_timeout'], $seconds));
294+
}
295+
296+
$this->driverConnection->beginTransaction();
297+
try {
298+
$queryBuilder = $this->driverConnection->createQueryBuilder()
299+
->update($this->configuration['table_name'])
300+
->set('delivered_at', '?')
301+
->where('id = ?');
302+
$now = new \DateTimeImmutable('UTC');
303+
$this->executeStatement($queryBuilder->getSQL(), [
304+
$now,
305+
$id,
306+
], [
307+
Types::DATETIME_IMMUTABLE,
308+
]);
309+
310+
$this->driverConnection->commit();
311+
} catch (\Throwable $e) {
312+
$this->driverConnection->rollBack();
313+
throw new TransportException($e->getMessage(), 0, $e);
314+
}
315+
}
316+
289317
public function setup(): void
290318
{
291319
$configuration = $this->driverConnection->getConfiguration();

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineReceiver.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
1919
use Symfony\Component\Messenger\Exception\TransportException;
2020
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
21+
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
2122
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
2223
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
2324
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@@ -26,7 +27,7 @@
2627
/**
2728
* @author Vincent Touzet <vincent.touzet@gmail.com>
2829
*/
29-
class DoctrineReceiver implements ListableReceiverInterface, MessageCountAwareInterface
30+
class DoctrineReceiver implements ListableReceiverInterface, MessageCountAwareInterface, KeepaliveReceiverInterface
3031
{
3132
private const MAX_RETRIES = 3;
3233
private int $retryingSafetyCounter = 0;
@@ -72,6 +73,11 @@ public function ack(Envelope $envelope): void
7273
});
7374
}
7475

76+
public function keepalive(Envelope $envelope, ?int $seconds = null): void
77+
{
78+
$this->connection->keepalive($this->findDoctrineReceivedStamp($envelope)->getId(), $seconds);
79+
}
80+
7581
public function reject(Envelope $envelope): void
7682
{
7783
$this->withRetryableExceptionRetry(function () use ($envelope) {

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineTransport.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Doctrine\DBAL\Schema\Schema;
1616
use Doctrine\DBAL\Schema\Table;
1717
use Symfony\Component\Messenger\Envelope;
18+
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1819
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
1920
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
2021
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -24,7 +25,7 @@
2425
/**
2526
* @author Vincent Touzet <vincent.touzet@gmail.com>
2627
*/
27-
class DoctrineTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface, ListableReceiverInterface
28+
class DoctrineTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface, ListableReceiverInterface, KeepaliveReceiverInterface
2829
{
2930
private DoctrineReceiver $receiver;
3031
private DoctrineSender $sender;
@@ -50,6 +51,11 @@ public function reject(Envelope $envelope): void
5051
$this->getReceiver()->reject($envelope);
5152
}
5253

54+
public function keepalive(Envelope $envelope, ?int $seconds = null): void
55+
{
56+
$this->getReceiver()->keepalive($envelope, $seconds);
57+
}
58+
5359
public function getMessageCount(): int
5460
{
5561
return $this->getReceiver()->getMessageCount();

0 commit comments

Comments
 (0)
2A24
0