8000 feature #59601 [Messenger] Add keepalive support (silasjoisten) · symfony/symfony@ec4b5c7 · GitHub
[go: up one dir, main page]

Skip to content

Commit ec4b5c7

Browse files
committed
feature #59601 [Messenger] Add keepalive support (silasjoisten)
This PR was squashed before being merged into the 7.3 branch. Discussion ---------- [Messenger] Add keepalive support | Q | A | ------------- | --- | Branch? | 7.3 | Bug fix? | no | New feature? | yes | Deprecations? | no | Issues | - | License | MIT This Pull Request adds keepalive support to the Doctrine Messenger transport by implementing the keepalive method in the Connection class. This enhancement aligns Doctrine transport with the existing **keepalive** functionality already supported by other Messenger transports, such as Redis and Beanstalkd. The **keepalive** principle was introduced in Symfony 7.2 to address issues where long-running message processing could lead to premature message timeouts and redelivery. By keeping the message “alive” on the transport layer, the message remains marked as being processed until explicitly acknowledged. Other transports like Redis and Beanstalkd have already implemented this feature. This PR extends the functionality to the Doctrine transport. Commits ------- 970c07b [Messenger] Add keepalive support
2 parents 1f83bf1 + 970c07b commit ec4b5c7

File tree

8 files changed

+172
-3
lines changed

8 files changed

+172
-3
lines changed

src/Symfony/Component/Messenger/Bridge/Doctrine/CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
7.3
5+
---
6+
7+
* Add "keepalive" support
8+
49
7.1
510
---
611

src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php

+91
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,97 @@ public function testSendLastInsertIdReturnsInteger()
299299
self::assertSame('1', $id);
300300
}
301301

302+
public function testKeepalive()
303+
{
304+
$queryBuilder = $this->getQueryBuilderMock();
305+
$driverConnection = $this->getDBALConnectionMock();
306+
307+
$connection = new Connection(['redeliver_timeout' => 30, 'table_name' => 'messenger_messages'], $driverConnection);
308+
309+
$queryBuilder->expects($this->once())
310+
->method('update')
311+
->with('messenger_messages')
312+
->willReturnSelf();
313+
314+
$queryBuilder->expects($this->once())
315+
->method('set')
316+
->with('delivered_at', '?')
317+
->willReturnSelf();
318+
319+
$queryBuilder->expects($this->once())
320+
->method('where')
321+
->with('id = ?')
322+
->willReturnSelf();
323+
324+
$driverConnection->expects($this->once())
325+
->method('beginTransaction');
326+
327+
$driverConnection->expects($this->once())
328+
->method('createQueryBuilder')
329+
->willReturn($queryBuilder);
330+
331+
$driverConnection->expects($this->once())
332+
->method('commit');
333+
334+
$connection->keepalive('1');
335+
}
336+
337+
public function testKeepaliveRollback()
338+
{
339+
$queryBuilder = $this->getQueryBuilderMock();
340+
$driverConnection = $this->getDBALConnectionMock();
< 9E88 /td>341+
342+
$connection = new Connection(['redeliver_timeout' => 30, 'table_name' => 'messenger_messages'], $driverConnection);
343+
344+
$queryBuilder->expects($this->once())
345+
->method('update')
346+
->with('messenger_messages')
347+
->willReturnSelf();
348+
349+
$queryBuilder->expects($this->once())
350+
->method('set')
351+
->with('delivered_at', '?')
352+
->willReturnSelf();
353+
354+
$queryBuilder->expects($this->once())
355+
->method('where')
356+
->with('id = ?')
357+
->willReturnSelf();
358+
359+
$driverConnection->expects($this->once())
360+
->method('beginTransaction');
361+
362+
$driverConnection->expects($this->once())
363+
->method('createQueryBuilder')
364+
->willReturn($queryBuilder);
365+
366+
$driverConnection->expects($this->once())
367+
->method('executeStatement')
368+
->willThrowException($this->createMock(DBALException::class));
369+
370+
$driverConnection->expects($this->never())
371+
->method('commit');
372+
373+
$driverConnection->expects($this->once())
374+
->method('rollBack');
375+
376+
$this->expectException(TransportException::class);
377+
378+
$connection->keepalive('1');
379+
}
380+
381+
public function testKeepaliveThrowsExceptionWhenRedeliverTimeoutIsLessThenInterval()
382+
{
383+
$driverConnection = $this->getDBALConnectionMock();
384+
385+
$connection = new Connection(['redeliver_timeout' => 30], $driverConnection);
386+
387+
$this->expectException(TransportException::class);
388+
$this->expectExceptionMessage('Doctrine redeliver_timeout (30s) cannot be smaller than the keepalive interval (60s).');
389+
390+
$connection->keepalive('1', 60);
391+
}
392+
302393
private function getDBALConnectionMock()
303394
{
304395
$driverConnection = $this->createMock(DBALConnection::class);

src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineReceiverTest.php

+16
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,22 @@ public function testRejectThrowsException()
353353
$receiver->reject($envelope);
354354
}
355355

356+
public function testKeepalive()
357+
{
358+
$serializer = $this->createSerializer();
359+
$connection = $this->createMock(Connection::class);
360+
361+
$envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]);
362+
$receiver = new DoctrineReceiver($connection, $serializer);
363+
364+
$connection
365+
->expects($this->once())
366+
->method('keepalive')
367+
->with('1');
368+
369+
$receiver->keepalive($envelope);
370+
}
371+
356372
private function createDoctrineEnvelope(): array
357373
{
358374
return [

src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineTransportTest.php

+17
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use PHPUnit\Framework\TestCase;
1717
use Symfony\Component\Messenger\Bridge\Doctrine\Tests\Fixtures\DummyMessage;
1818
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection;
19+
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineReceivedStamp;
1920
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport;
2021
use Symfony\Component\Messenger\Envelope;
2122
use Symfony\Component\Messenger\Transport\Serialization\SerializerI F987 nterface;
@@ -69,6 +70,22 @@ public function testConfigureSchema()
6970
$transport->configureSchema($schema, $dbalConnection, static fn () => true);
7071
}
7172

73+
public function testKeepalive()
74+
{
75+
$transport = $this->getTransport(
76+
null,
77+
$connection = $this->createMock(Connection::class)
78+
);
79+
80+
$envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]);
81+
82+
$connection->expects($this->once())
83+
->method('keepalive')
84+
->with('1');
85+
86+
$transport->keepalive($envelope);
87+
}
88+
7289
private function getTransport(?SerializerInterface $serializer = null, ?Connection $connection = null): DoctrineTransport
7390
{
7491
$serializer ??= $this->createMock(SerializerInterface::class);

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php

+28
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,34 @@ public function reject(string $id): bool
286286
}
287287
}
288288

289+
public function keepalive(string $id, ?int $seconds = null): void
290+
{
291+
// Check if the redeliver timeout is smaller than the keepalive interval
292+
if (null !== $seconds && $this->configuration['redeliver_timeout'] < $seconds) {
293+
throw new TransportException(\sprintf('Doctrine redeliver_timeout (%ds) cannot be smaller than the keepalive interval (%ds).', $this->configuration['redeliver_timeout'], $seconds));
294+
}
295+
296+
$this->driverConnection->beginTransaction();
297+
try {
298+
$queryBuilder = $this->driverConnection->createQueryBuilder()
299+
->update($this->configuration['table_name'])
300+
->set('delivered_at', '?')
301+
->where('id = ?');
302+
$now = new \DateTimeImmutable('UTC');
303+
$this->executeStatement($queryBuilder->getSQL(), [
304+
$now,
305+
$id,
306+
], [
307+
Types::DATETIME_IMMUTABLE,
308+
]);
309+
310+
$this->driverConnection->commit();
311+
} catch (\Throwable $e) {
312+
$this->driverConnection->rollBack();
313+
throw new TransportException($e->getMessage(), 0, $e);
314+
}
315+
}
316+
289317
public function setup(): void
290318
{
291319
$configuration = $this->driverConnection->getConfiguration();

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineReceiver.php

+7-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
1919
use Symfony\Component\Messenger\Exception\TransportException;
2020
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
21+
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
2122
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
2223
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
2324
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@@ -26,7 +27,7 @@
2627
/**
2728
* @author Vincent Touzet <vincent.touzet@gmail.com>
2829
*/
29-
class DoctrineReceiver implements ListableReceiverInterface, MessageCountAwareInterface
30+
class DoctrineReceiver implements ListableReceiverInterface, MessageCountAwareInterface, KeepaliveReceiverInterface
3031
{
3132
private const MAX_RETRIES = 3;
3233
private int $retryingSafetyCounter = 0;
@@ -72,6 +73,11 @@ public function ack(Envelope $envelope): void
7273
});
7374
}
7475

76+
public function keepalive(Envelope $envelope, ?int $seconds = null): void
77+
{
78+
$this->connection->keepalive($this->findDoctrineReceivedStamp($envelope)->getId(), $seconds);
79+
}
80+
7581
public function reject(Envelope $envelope): void
7682
{
7783
$this->withRetryableExceptionRetry(function () use ($envelope) {

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineTransport.php

+7-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Doctrine\DBAL\Schema\Schema;
1616
use Doctrine\DBAL\Schema\Table;
1717
use Symfony\Component\Messenger\Envelope;
18+
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1819
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
1920
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
2021
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -24,7 +25,7 @@
2425
/**
2526
* @author Vincent Touzet <vincent.touzet@gmail.com>
2627
*/
27-
class DoctrineTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface, ListableReceiverInterface
28+
class DoctrineTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface, ListableReceiverInterface, KeepaliveReceiverInterface
2829
{
2930
private DoctrineReceiver $receiver;
3031
private DoctrineSender $sender;
@@ -50,6 +51,11 @@ public function reject(Envelope $envelope): void
5051
$this->getReceiver()->reject($envelope);
5152
}
5253

54+
public function keepalive(Envelope $envelope, ?int $seconds = null): void
55+
{
56+
$this->getReceiver()->keepalive($envelope, $seconds);
57+
}
58+
5359
public function getMessageCount(): int
5460
{
5561
return $this->getReceiver()->getMessageCount();

src/Symfony/Component/Messenger/Bridge/Doctrine/composer.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
"require": {
1919
"php": ">=8.2",
2020
"doctrine/dbal": "^3.6|^4",
21-
"symfony/messenger": "^6.4|^7.0",
21+
"symfony/messenger": "^7.2",
2222
"symfony/service-contracts": "^2.5|^3"
2323
},
2424
"require-dev": {

0 commit comments

Comments
 (0)
0