diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md index 27a75f58d1e82..b18222013f6c1 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md @@ -1,6 +1,11 @@ CHANGELOG ========= +7.2 +--- + + * Implement the `KeepaliveReceiverInterface` to enable asynchronously notifying Beanstalkd that the job is still being processed, in order to avoid timeouts + 5.2.0 ----- diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php index b8f6d317a9dad..7b402746815fb 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php @@ -16,6 +16,7 @@ use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp; use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceiver; use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection; +use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\Serializer; @@ -78,6 +79,17 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException() $receiver->get(); } + public function testKeepalive() + { + $serializer = $this->createSerializer(); + + $connection = $this->createMock(Connection::class); + $connection->expects($this->once())->method('keepalive')->with(1); + + $receiver = new BeanstalkdReceiver($connection, $serializer); + $receiver->keepalive(new Envelope(new DummyMessage('foo'), [new BeanstalkdReceivedStamp(1, 'bar')])); + } + private function createBeanstalkdEnvelope(): array { return [ diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdTransportTest.php b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdTransportTest.php index cdbd80e980ce0..507c50bb4480b 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdTransportTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdTransportTest.php @@ -13,6 +13,7 @@ use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Bridge\Beanstalkd\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp; use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdTransport; use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection; use Symfony\Component\Messenger\Envelope; @@ -50,6 +51,18 @@ public function testReceivesMessages() $this->assertSame($decodedMessage, $envelopes[0]->getMessage()); } + public function testKeepalive() + { + $transport = $this->getTransport( + null, + $connection = $this->createMock(Connection::class), + ); + + $connection->expects($this->once())->method('keepalive')->with(1); + + $transport->keepalive(new Envelope(new DummyMessage('foo'), [new BeanstalkdReceivedStamp(1, 'bar')])); + } + private function getTransport(?SerializerInterface $serializer = null, ?Connection $connection = null): BeanstalkdTransport { $serializer ??= $this->createMock(SerializerInterface::class); diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php index f4cc745846584..eaacfa4b60b99 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php @@ -330,4 +330,37 @@ public function testSendWhenABeanstalkdExceptionOccurs() $connection->send($body, $headers, $delay); } + + public function testKeepalive() + { + $id = 123456; + + $tube = 'baz'; + + $client = $this->createMock(PheanstalkInterface::class); + $client->expects($this->once())->method('useTube')->with($tube)->willReturn($client); + $client->expects($this->once())->method('touch')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id)); + + $connection = new Connection(['tube_name' => $tube], $client); + + $connection->keepalive((string) $id); + } + + public function testKeepaliveWhenABeanstalkdExceptionOccurs() + { + $id = 123456; + + $tube = 'baz123'; + + $exception = new ServerException('baz error'); + + $client = $this->createMock(PheanstalkInterface::class); + $client->expects($this->once())->method('useTube')->with($tube)->willReturn($client); + $client->expects($this->once())->method('touch')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willThrowException($exception); + + $connection = new Connection(['tube_name' => $tube], $client); + + $this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception)); + $connection->keepalive((string) $id); + } } diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php index a115d66f12e04..bc32a720f59af 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php @@ -14,15 +14,15 @@ use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\LogicException; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; +use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; -use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; /** * @author Antonio Pauletich */ -class BeanstalkdReceiver implements ReceiverInterface, MessageCountAwareInterface +class BeanstalkdReceiver implements KeepaliveReceiverInterface, MessageCountAwareInterface { private SerializerInterface $serializer; @@ -65,6 +65,11 @@ public function reject(Envelope $envelope): void $this->connection->reject($this->findBeanstalkdReceivedStamp($envelope)->getId()); } + public function keepalive(Envelope $envelope): void + { + $this->connection->keepalive($this->findBeanstalkdReceivedStamp($envelope)->getId()); + } + public function getMessageCount(): int { return $this->connection->getMessageCount(); diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdTransport.php b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdTransport.php index 4efc9fd42f23b..9a9a16acb2c08 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdTransport.php +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdTransport.php @@ -12,6 +12,7 @@ namespace Symfony\Component\Messenger\Bridge\Beanstalkd\Transport; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -20,7 +21,7 @@ /** * @author Antonio Pauletich */ -class BeanstalkdTransport implements TransportInterface, MessageCountAwareInterface +class BeanstalkdTransport implements TransportInterface, KeepaliveReceiverInterface, MessageCountAwareInterface { private SerializerInterface $serializer; private BeanstalkdReceiver $receiver; @@ -48,6 +49,11 @@ public function reject(Envelope $envelope): void $this->getReceiver()->reject($envelope); } + public function keepalive(Envelope $envelope): void + { + $this->getReceiver()->keepalive($envelope); + } + public function getMessageCount(): int { return $this->getReceiver()->getMessageCount(); diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php index c7255459ed370..c2c5cd7ee49f8 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php @@ -180,6 +180,15 @@ public function reject(string $id): void } } + public function keepalive(string $id): void + { + try { + $this->client->useTube($this->tube)->touch(new JobId((int) $id)); + } catch (Exception $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } + } + public function getMessageCount(): int { try { diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/composer.json b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/composer.json index e0b0eedbf933e..549271b4b431a 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/composer.json +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/composer.json @@ -14,7 +14,7 @@ "require": { "php": ">=8.2", "pda/pheanstalk": "^4.0", - "symfony/messenger": "^6.4|^7.0" + "symfony/messenger": "^7.2" }, "require-dev": { "symfony/property-access": "^6.4|^7.0", diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 3a1944824cc73..4d492dfd49524 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -10,6 +10,7 @@ CHANGELOG * Add `--format` option to the `messenger:stats` command * Add `getRetryDelay()` method to `RecoverableExceptionInterface` * Add `skip` option to `messenger:failed:retry` command when run interactively to skip message and requeue it + * Add the ability to asynchronously notify transports about which messages are still being processed by the worker, using `pcntl_alarm()` 7.1 --- diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index a974859f6b23c..0fe7b4357f880 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -43,6 +43,8 @@ #[AsCommand(name: 'messenger:consume', description: 'Consume messages')] class ConsumeMessagesCommand extends Command implements SignalableCommandInterface { + private const DEFAULT_KEEPALIVE_INTERVAL = 5; + private ?Worker $worker = null; public function __construct( @@ -75,6 +77,7 @@ protected function configure(): void new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'), new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'), new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'), + new InputOption('keepalive', null, InputOption::VALUE_OPTIONAL, 'Whether to use the transport\'s keepalive mechanism if implemented', self::DEFAULT_KEEPALIVE_INTERVAL), ]) ->setHelp(<<<'EOF' The %command.name% command consumes messages and dispatches them to the message bus. @@ -124,6 +127,13 @@ protected function configure(): void ; } + protected function initialize(InputInterface $input, OutputInterface $output): void + { + if ($input->hasParameterOption('--keepalive')) { + $this->getApplication()->setAlarmInterval((int) ($input->getOption('keepalive') ?? self::DEFAULT_KEEPALIVE_INTERVAL)); + } + } + protected function interact(InputInterface $input, OutputInterface $output): void { $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output); @@ -264,7 +274,7 @@ public function complete(CompletionInput $input, CompletionSuggestions $suggesti public function getSubscribedSignals(): array { - return $this->signals ?? (\extension_loaded('pcntl') ? [\SIGTERM, \SIGINT, \SIGQUIT] : []); + return $this->signals ?? (\extension_loaded('pcntl') ? [\SIGTERM, \SIGINT, \SIGQUIT, \SIGALRM] : []); } public function handleSignal(int $signal, int|false $previousExitCode = 0): int|false @@ -273,6 +283,14 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int| return false; } + if (\SIGALRM === $signal) { + $this->logger?->info('Sending keepalive request.', ['transport_names' => $this->worker->getMetadata()->getTransportNames()]); + + $this->worker->keepalive(); + + return false; + } + $this->logger?->info('Received signal {signal}.', ['signal' => $signal, 'transport_names' => $this->worker->getMetadata()->getTransportNames()]); $this->worker->stop(); diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php index aac0ec991059d..b3c7113433962 100644 --- a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php +++ b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php @@ -41,6 +41,8 @@ #[AsCommand(name: 'messenger:failed:retry', description: 'Retry one or more messages from the failure transport')] class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand implements SignalableCommandInterface { + private const DEFAULT_KEEPALIVE_INTERVAL = 5; + private bool $shouldStop = false; private bool $forceExit = false; private ?Worker $worker = null; @@ -64,6 +66,7 @@ protected function configure(): void new InputArgument('id', InputArgument::IS_ARRAY, 'Specific message id(s) to retry'), new InputOption('force', null, InputOption::VALUE_NONE, 'Force action without confirmation'), new InputOption('transport', null, InputOption::VALUE_OPTIONAL, 'Use a specific failure transport', self::DEFAULT_TRANSPORT_OPTION), + new InputOption('keepalive', null, InputOption::VALUE_OPTIONAL, 'Whether to use the transport\'s keepalive mechanism if implemented', self::DEFAULT_KEEPALIVE_INTERVAL), ]) ->setHelp(<<<'EOF' The %command.name% retries message in the failure transport. @@ -87,6 +90,13 @@ protected function configure(): void ; } + protected function initialize(InputInterface $input, OutputInterface $output): void + { + if ($input->hasParameterOption('--keepalive')) { + $this->getApplication()->setAlarmInterval((int) ($input->getOption('keepalive') ?? self::DEFAULT_KEEPALIVE_INTERVAL)); + } + } + protected function execute(InputInterface $input, OutputInterface $output): int { $this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1)); @@ -134,7 +144,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int public function getSubscribedSignals(): array { - return $this->signals ?? (\extension_loaded('pcntl') ? [\SIGTERM, \SIGINT, \SIGQUIT] : []); + return $this->signals ?? (\extension_loaded('pcntl') ? [\SIGTERM, \SIGINT, \SIGQUIT, \SIGALRM] : []); } public function handleSignal(int $signal, int|false $previousExitCode = 0): int|false @@ -143,6 +153,14 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int| return false; } + if (\SIGALRM === $signal) { + $this->logger?->info('Sending keepalive request.', ['transport_names' => $this->worker->getMetadata()->getTransportNames()]); + + $this->worker->keepalive(); + + return false; + } + $this->logger?->info('Received signal {signal}.', ['signal' => $signal, 'transport_names' => $this->worker->getMetadata()->getTransportNames()]); $this->worker->stop(); diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index 12deb98f25358..831055fa4c2cb 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -14,6 +14,7 @@ use PHPUnit\Framework\TestCase; use Psr\EventDispatcher\EventDispatcherInterface; use Psr\Log\LoggerInterface; +use Symfony\Bridge\PhpUnit\ClockMock; use Symfony\Component\Clock\MockClock; use Symfony\Component\EventDispatcher\EventDispatcher; use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter; @@ -43,6 +44,7 @@ use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; use Symfony\Component\Messenger\Tests\Fixtures\DummyReceiver; use Symfony\Component\Messenger\Tests\Fixtures\ResettableDummyReceiver; +use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Worker; @@ -598,6 +600,64 @@ public function testGcCollectCyclesIsCalledOnMessageHandle() $this->assertGreaterThan(0, $gcStatus['runs']); } + + /** + * @requires extension pcntl + */ + public function testKeepalive() + { + ClockMock::withClockMock(false); + + $expectedEnvelopes = [ + new Envelope(new DummyMessage('Hey')), + new Envelope(new DummyMessage('Bob')), + ]; + + $receiver = new DummyKeepaliveReceiver([ + [$expectedEnvelopes[0]], + [$expectedEnvelopes[1]], + ]); + + $handler = new DummyBatchHandler(3); + + $middleware = new HandleMessageMiddleware(new HandlersLocator([ + DummyMessage::class => [new HandlerDescriptor($handler)], + ])); + + $bus = new MessageBus([$middleware]); + + $dispatcher = new EventDispatcher(); + $dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) use ($receiver) { + static $i = 0; + if (1 < ++$i) { + $event->getWorker()->stop(); + $this->assertSame(2, $receiver->getAcknowledgeCount()); + } else { + $this->assertSame(0, $receiver->getAcknowledgeCount()); + } + }); + + $worker = new Worker([$receiver], $bus, $dispatcher, clock: new MockClock()); + + try { + $oldAsync = pcntl_async_signals(true); + pcntl_signal(\SIGALRM, fn () => $worker->keepalive()); + pcntl_alarm(2); + + $worker->run(); + } finally { + pcntl_async_signals($oldAsync); + pcntl_signal(\SIGALRM, \SIG_DFL); + } + + $this->assertCount(2, $receiver->keepaliveEnvelopes); + $this->assertSame($expectedEnvelopes, $receiver->keepaliveEnvelopes); + + $receiver->keepaliveEnvelopes = []; + $worker->keepalive(); + + $this->assertCount(0, $receiver->keepaliveEnvelopes); + } } class DummyQueueReceiver extends DummyReceiver implements QueueReceiverInterface @@ -608,12 +668,26 @@ public function getFromQueues(array $queueNames): iterable } } +class DummyKeepaliveReceiver extends DummyReceiver implements KeepaliveReceiverInterface +{ + public array $keepaliveEnvelopes = []; + + public function keepalive(Envelope $envelope): void + { + $this->keepaliveEnvelopes[] = $envelope; + } +} + class DummyBatchHandler implements BatchHandlerInterface { use BatchHandlerTrait; public array $processedMessages; + public function __construct(private ?int $delay = null) + { + } + public function __invoke(DummyMessage $message, ?Acknowledger $ack = null) { return $this->handle($message, $ack); @@ -628,6 +702,12 @@ private function process(array $jobs): void { $this->processedMessages = array_column($jobs, 0); + if (null !== $this->delay) { + for ($i = 0; $i < $this->delay; ++$i) { + sleep(1); + } + } + foreach ($jobs as [$job, $ack]) { $ack->ack($job); } diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/KeepaliveReceiverInterface.php b/src/Symfony/Component/Messenger/Transport/Receiver/KeepaliveReceiverInterface.php new file mode 100644 index 0000000000000..5196aca3c5987 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Receiver/KeepaliveReceiverInterface.php @@ -0,0 +1,25 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Receiver; + +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\TransportException; + +interface KeepaliveReceiverInterface extends ReceiverInterface +{ + /** + * Informs the transport that the message is still being processed to avoid a timeout on the transport's side. + * + * @throws TransportException If there is an issue communicating with the transport + */ + public function keepalive(Envelope $envelope): void; +} diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index c4c1a2462dc1c..763b4e11a6dbd 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -31,6 +31,7 @@ use Symfony\Component\Messenger\Stamp\NoAutoAckStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; +use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\RateLimiter\LimiterInterface; @@ -47,6 +48,10 @@ class Worker private WorkerMetadata $metadata; private array $acks = []; private \SplObjectStorage $unacks; + /** + * @var \SplObjectStorage + */ + private \SplObjectStorage $keepalives; /** * @param ReceiverInterface[] $receivers Where the key is the transport name @@ -63,6 +68,7 @@ public function __construct( 'transportNames' => array_keys($receivers), ]); $this->unacks = new \SplObjectStorage(); + $this->keepalives = new \SplObjectStorage(); } /** @@ -105,6 +111,10 @@ public function run(array $options = []): void foreach ($envelopes as $envelope) { $envelopeHandled = true; + if ($receiver instanceof KeepaliveReceiverInterface) { + $this->keepalives[$envelope->getMessage()] = [$transportName, $envelope]; + } + $this->rateLimit($transportName); $this->handleMessage($envelope, $transportName); $this->eventDispatcher?->dispatch(new WorkerRunningEvent($this, false)); @@ -186,6 +196,7 @@ private function ack(): bool if ($rejectFirst = $e instanceof RejectRedeliveredMessageException) { // redelivered messages are rejected first so that continuous failures in an event listener or while // publishing for retry does not cause infinite redelivery loops + unset($this->keepalives[$envelope->getMessage()]); $receiver->reject($envelope); } @@ -199,6 +210,7 @@ private function ack(): bool $envelope = $failedEvent->getEnvelope(); if (!$rejectFirst) { + unset($this->keepalives[$envelope->getMessage()]); $receiver->reject($envelope); } @@ -218,6 +230,7 @@ private function ack(): bool $this->logger->info('{class} was handled successfully (acknowledging to transport).', $context); } + unset($this->keepalives[$envelope->getMessage()]); $receiver->ack($envelope); } @@ -277,6 +290,23 @@ public function stop(): void $this->shouldStop = true; } + public function keepalive(): void + { + foreach ($this->keepalives as $message) { + [$transportName, $envelope] = $this->keepalives[$message]; + + if (!$this->receivers[$transportName] instanceof KeepaliveReceiverInterface) { + throw new RuntimeException(\sprintf('Receiver for "%s" does not implement "%s".', $transportName, KeepaliveReceiverInterface::class)); + } + + $this->logger?->info('Sending keepalive request.', [ + 'transport' => $transportName, + 'message_id' => $envelope->last(TransportMessageIdStamp::class)?->getId(), + ]); + $this->receivers[$transportName]->keepalive($envelope); + } + } + public function getMetadata(): WorkerMetadata { return $this->metadata; diff --git a/src/Symfony/Component/Messenger/composer.json b/src/Symfony/Component/Messenger/composer.json index 3fdfe4a55ee26..1f12f16662f88 100644 --- a/src/Symfony/Component/Messenger/composer.json +++ b/src/Symfony/Component/Messenger/composer.json @@ -23,7 +23,7 @@ }, "require-dev": { "psr/cache": "^1.0|^2.0|^3.0", - "symfony/console": "^6.4|^7.0", + "symfony/console": "^7.2", "symfony/dependency-injection": "^6.4|^7.0", "symfony/event-dispatcher": "^6.4|^7.0", "symfony/http-kernel": "^6.4|^7.0", @@ -37,7 +37,7 @@ "symfony/validator": "^6.4|^7.0" }, "conflict": { - "symfony/console": "<6.4", + "symfony/console": "<7.2", "symfony/event-dispatcher": "<6.4", "symfony/event-dispatcher-contracts": "<2.5", "symfony/framework-bundle": "<6.4",