8000 [Messenger] Add keepalive support to Doctrine transport by silasjoisten · Pull Request #59601 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[Messenger] Add keepalive support to Doctrine transport #59601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
[Messenger] Add keepalive support
  • Loading branch information
silasjoisten authored and fabpot committed Jan 25, 2025
commit 970c07beaaac267b27558813c8fd8917411337dc
5 changes: 5 additions & 0 deletions src/Symfony/Component/Messenger/Bridge/Doctrine/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
CHANGELOG
=========

7.3
---

* Add "keepalive" support

7.1
---

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,97 @@ public function testSendLastInsertIdReturnsInteger()
self::assertSame('1', $id);
}

public function testKeepalive()
{
$queryBuilder = $this->getQueryBuilderMock();
$driverConnection = $this->getDBALConnectionMock();

$connection = new Connection(['redeliver_timeout' => 30, 'table_name' => 'messenger_messages'], $driverConnection);

$queryBuilder->expects($this->once())
->method('update')
->with('messenger_messages')
->willReturnSelf();

$queryBuilder->expects($this->once())
->method('set')
->with('delivered_at', '?')
->willReturnSelf();

$queryBuilder->expects($this->once())
->method('where')
->with('id = ?')
->willReturnSelf();

$driverConnection->expects($this->once())
->method('beginTransaction');

$driverConnection->expects($this->once())
->method('createQueryBuilder')
->willReturn($queryBuilder);

$driverConnection->expects($this->once())
->method('commit');

$connection->keepalive('1');
}

public function testKeepaliveRollback()
{
$queryBuilder = $this->getQueryBuilderMock();
$driverConnection = $this->getDBALConnectionMock();

$connection = new Connection(['redeliver_timeout' => 30, 'table_name' => 'messenger_messages'], $driverConnection);

$queryBuilder->expects($this->once())
->method('update')
->with('messenger_messages')
->willReturnSelf();

$queryBuilder->expects($this->once())
->method('set')
->with('delivered_at', '?')
->willReturnSelf();

$queryBuilder->expects($this->once())
->method('where')
->with('id = ?')
->willReturnSelf();

$driverConnection->expects($this->once())
->method('beginTransaction');

$driverConnection->expects($this->once())
->method('createQueryBuilder')
->willReturn($queryBuilder);

$driverConnection->expects($this->once())
->method('executeStatement')
->willThrowException($this->createMock(DBALException::class));

$driverConnection->expects($this->never())
->method('commit');

$driverConnection->expects($this->once())
->method('rollBack');

$this->expectException(TransportException::class);

$connection->keepalive('1');
}

public function testKeepaliveThrowsExceptionWhenRedeliverTimeoutIsLessThenInterval()
{
$driverConnection = $this->getDBALConnectionMock();

$connection = new Connection(['redeliver_timeout' => 30], $driverConnection);

$this->expectException(TransportException::class);
$this->expectExceptionMessage('Doctrine redeliver_timeout (30s) cannot be smaller than the keepalive interval (60s).');

$connection->keepalive('1', 60);
}

private function getDBALConnectionMock()
{
$driverConnection = $this->createMock(DBALConnection::class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,22 @@ public function testRejectThrowsException()
$receiver->reject($envelope);
}

public function testKeepalive()
{
$serializer = $this->createSerializer();
$connection = $this->createMock(Connection::class);

$envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]);
$receiver = new DoctrineReceiver($connection, $serializer);

$connection
->expects($this->once())
->method('keepalive')
->with('1');

$receiver->keepalive($envelope);
}

private function createDoctrineEnvelope(): array
{
return [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\Doctrine\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineReceivedStamp;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
Expand Down Expand Up @@ -69,6 +70,22 @@ public function testConfigureSchema()
$transport->configureSchema($schema, $dbalConnection, static fn () => true);
}

public function testKeepalive()
{
$transport = $this->getTransport(
null,
$connection = $this->createMock(Connection::class)
);

$envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]);

$connection->expects($this->once())
->method('keepalive')
->with('1');

$transport->keepalive($envelope);
}

private function getTransport(?SerializerInterface $serializer = null, ?Connection $connection = null): DoctrineTransport
{
$serializer ??= $this->createMock(SerializerInterface::class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,34 @@ public function reject(string $id): bool
}
}

public function keepalive(string $id, ?int $seconds = null): void
{
// Check if the redeliver timeout is smaller than the keepalive interval
if (null !== $seconds && $this->configuration['redeliver_timeout'] < $seconds) {
throw new TransportException(\sprintf('Doctrine redeliver_timeout (%ds) cannot be smaller than the keepalive interval (%ds).', $this->configuration['redeliver_timeout'], $seconds));
}

$this->driverConnection->beginTransaction();
try {
$queryBuilder = $this->driverConnection->createQueryBuilder()
->update($this->configuration['table_name'])
->set('delivered_at', '?')
->where('id = ?');
$now = new \DateTimeImmutable('UTC');
$this->executeStatement($queryBuilder->getSQL(), [
$now,
$id,
], [
Types::DATETIME_IMMUTABLE,
]);

$this->driverConnection->commit();
} catch (\Throwable $e) {
$this->driverConnection->rollBack();
throw new TransportException($e->getMessage(), 0, $e);
}
}

public function setup(): void
{
$configuration = $this->driverConnection->getConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
Expand All @@ -26,7 +27,7 @@
/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
class DoctrineReceiver implements ListableReceiverInterface, MessageCountAwareInterface
class DoctrineReceiver implements ListableReceiverInterface, MessageCountAwareInterface, KeepaliveReceiverInterface
{
private const MAX_RETRIES = 3;
private int $retryingSafetyCounter = 0;
Expand Down Expand Up @@ -72,6 +73,11 @@ public function ack(Envelope $envelope): void
});
}

public function keepalive(Envelope $envelope, ?int $seconds = null): void
{
$this->connection->keepalive($this->findDoctrineReceivedStamp($envelope)->getId(), $seconds);
}

public function reject(Envelope $envelope): void
{
$this->withRetryableExceptionRetry(function () use ($envelope) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\Table;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
Expand All @@ -24,7 +25,7 @@
/**
* @author Vincent Touzet <vincent.touzet@gmail.com>
*/
class DoctrineTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface, ListableReceiverInterface
class DoctrineTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface, ListableReceiverInterface, KeepaliveReceiverInterface
{
private DoctrineReceiver $receiver;
private DoctrineSender $sender;
Expand All @@ -50,6 +51,11 @@ public function reject(Envelope $envelope): void
$this->getReceiver()->reject($envelope);
}

public function keepalive(Envelope $envelope, ?int $seconds = null): void
{
$this->getReceiver()->keepalive($envelope, $seconds);
}

public function getMessageCount(): int
{
return $this->getReceiver()->getMessageCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"require": {
"php": ">=8.2",
"doctrine/dbal": "^3.6|^4",
"symfony/messenger": "^6.4|^7.0",
"symfony/messenger": "^7.2",
"symfony/service-contracts": "^2.5|^3"
},
"require-dev": {
Expand Down
Loading
0