From e3beb18a1ea5da48cfae8be4bb0f29cb5cd2a720 Mon Sep 17 00:00:00 2001 From: valtzu Date: Sun, 13 Oct 2024 15:18:50 +0300 Subject: [PATCH] Add `$seconds` to `keepalive` methods --- src/Symfony/Component/Console/Application.php | 8 ++++++++ src/Symfony/Component/Console/Tests/ApplicationTest.php | 1 + .../Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php | 2 +- .../Bridge/Beanstalkd/Transport/BeanstalkdTransport.php | 4 ++-- .../Messenger/Command/ConsumeMessagesCommand.php | 2 +- .../Messenger/Command/FailedMessagesRetryCommand.php | 2 +- src/Symfony/Component/Messenger/Tests/WorkerTest.php | 6 +++--- .../Transport/Receiver/KeepaliveReceiverInterface.php | 4 +++- src/Symfony/Component/Messenger/Worker.php | 4 ++-- 9 files changed, 22 insertions(+), 11 deletions(-) diff --git a/src/Symfony/Component/Console/Application.php b/src/Symfony/Component/Console/Application.php index 6075a6ef51d80..2ed1319cb952c 100644 --- a/src/Symfony/Component/Console/Application.php +++ b/src/Symfony/Component/Console/Application.php @@ -139,6 +139,14 @@ public function setAlarmInterval(?int $seconds): void $this->scheduleAlarm(); } + /** + * Gets the interval in seconds on which a SIGALRM signal is dispatched. + */ + public function getAlarmInterval(): ?int + { + return $this->alarmInterval; + } + private function scheduleAlarm(): void { if (null !== $this->alarmInterval) { diff --git a/src/Symfony/Component/Console/Tests/ApplicationTest.php b/src/Symfony/Component/Console/Tests/ApplicationTest.php index a4ec0de7c4fc1..4693535051124 100644 --- a/src/Symfony/Component/Console/Tests/ApplicationTest.php +++ b/src/Symfony/Component/Console/Tests/ApplicationTest.php @@ -2305,6 +2305,7 @@ public function testAlarmDispatchWithoutEventDispatcher() $application = $this->createSignalableApplication($command, null); $this->assertSame(1, $application->run(new ArrayInput(['alarm']))); + $this->assertSame(1, $application->getAlarmInterval()); $this->assertTrue($command->signaled); } diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php index bc32a720f59af..7dd19e8c90a8b 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php @@ -65,7 +65,7 @@ public function reject(Envelope $envelope): void $this->connection->reject($this->findBeanstalkdReceivedStamp($envelope)->getId()); } - public function keepalive(Envelope $envelope): void + public function keepalive(Envelope $envelope, ?int $seconds = null): void { $this->connection->keepalive($this->findBeanstalkdReceivedStamp($envelope)->getId()); } diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdTransport.php b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdTransport.php index 9a9a16acb2c08..6400e15653807 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdTransport.php +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdTransport.php @@ -49,9 +49,9 @@ public function reject(Envelope $envelope): void $this->getReceiver()->reject($envelope); } - public function keepalive(Envelope $envelope): void + public function keepalive(Envelope $envelope, ?int $seconds = null): void { - $this->getReceiver()->keepalive($envelope); + $this->getReceiver()->keepalive($envelope, $seconds); } public function getMessageCount(): int diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index 0fe7b4357f880..9345a71b97a10 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -286,7 +286,7 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int| if (\SIGALRM === $signal) { $this->logger?->info('Sending keepalive request.', ['transport_names' => $this->worker->getMetadata()->getTransportNames()]); - $this->worker->keepalive(); + $this->worker->keepalive($this->getApplication()->getAlarmInterval()); return false; } diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php index b3c7113433962..47bcd1463a915 100644 --- a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php +++ b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php @@ -156,7 +156,7 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int| if (\SIGALRM === $signal) { $this->logger?->info('Sending keepalive request.', ['transport_names' => $this->worker->getMetadata()->getTransportNames()]); - $this->worker->keepalive(); + $this->worker->keepalive($this->getApplication()->getAlarmInterval()); return false; } diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index 831055fa4c2cb..6520f8efbc3c6 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -641,7 +641,7 @@ public function testKeepalive() try { $oldAsync = pcntl_async_signals(true); - pcntl_signal(\SIGALRM, fn () => $worker->keepalive()); + pcntl_signal(\SIGALRM, fn () => $worker->keepalive(2)); pcntl_alarm(2); $worker->run(); @@ -654,7 +654,7 @@ public function testKeepalive() $this->assertSame($expectedEnvelopes, $receiver->keepaliveEnvelopes); $receiver->keepaliveEnvelopes = []; - $worker->keepalive(); + $worker->keepalive(2); $this->assertCount(0, $receiver->keepaliveEnvelopes); } @@ -672,7 +672,7 @@ class DummyKeepaliveReceiver extends DummyReceiver implements KeepaliveReceiverI { public array $keepaliveEnvelopes = []; - public function keepalive(Envelope $envelope): void + public function keepalive(Envelope $envelope, ?int $seconds = null): void { $this->keepaliveEnvelopes[] = $envelope; } diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/KeepaliveReceiverInterface.php b/src/Symfony/Component/Messenger/Transport/Receiver/KeepaliveReceiverInterface.php index 5196aca3c5987..b978b2a5db2c8 100644 --- a/src/Symfony/Component/Messenger/Transport/Receiver/KeepaliveReceiverInterface.php +++ b/src/Symfony/Component/Messenger/Transport/Receiver/KeepaliveReceiverInterface.php @@ -19,7 +19,9 @@ interface KeepaliveReceiverInterface extends ReceiverInterface /** * Informs the transport that the message is still being processed to avoid a timeout on the transport's side. * + * @param int|null $seconds The minimum duration the message should be kept alive + * * @throws TransportException If there is an issue communicating with the transport */ - public function keepalive(Envelope $envelope): void; + public function keepalive(Envelope $envelope, ?int $seconds = null): void; } diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index 763b4e11a6dbd..a3043d7238a7e 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -290,7 +290,7 @@ public function stop(): void $this->shouldStop = true; } - public function keepalive(): void + public function keepalive(?int $seconds): void { foreach ($this->keepalives as $message) { [$transportName, $envelope] = $this->keepalives[$message]; @@ -303,7 +303,7 @@ public function keepalive(): void 'transport' => $transportName, 'message_id' => $envelope->last(TransportMessageIdStamp::class)?->getId(), ]); - $this->receivers[$transportName]->keepalive($envelope); + $this->receivers[$transportName]->keepalive($envelope, $seconds); } }