8000 [Messenger] Notify transports which messages are still being processe… · symfony/symfony@317d963 · GitHub
[go: up one dir, main page]

Skip to content

Commit 317d963

Browse files
committed
[Messenger] Notify transports which messages are still being processed, using pcntl_alarm()
1 parent ecb03f9 commit 317d963

File tree

14 files changed

+270
-7
lines changed

14 files changed

+270
-7
lines changed

src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
7.1
5+
---
6+
7+
* Implement the `KeepaliveReceiverInterface` to enable asynchronously notifying Beanstalkd that the job is still being processed, in order to avoid timeouts
8+
49
5.2.0
510
-----
611

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp;
1717
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceiver;
1818
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
19+
use Symfony\Component\Messenger\Envelope;
1920
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
2021
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2122
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
@@ -78,6 +79,17 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
7879
$receiver->get();
7980
}
8081

82+
public function testKeepalive()
83+
{
84+
$serializer = $this->createSerializer();
85+
86+
$connection = $this->createMock(Connection::class);
87+
$connection->expects($this->once())->method('keepalive')->with(1);
88+
89+
$receiver = new BeanstalkdReceiver($connection, $serializer);
90+
$receiver->keepalive(new Envelope(new DummyMessage('foo'), [new BeanstalkdReceivedStamp(1, 'bar')]));
91+
}
92+
8193
private function createBeanstalkdEnvelope(): array
8294
{
8395
return [

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdTransportTest.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Bridge\Beanstalkd\Tests\Fixtures\DummyMessage;
16+
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp;
1617
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdTransport;
1718
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
1819
use Symfony\Component\Messenger\Envelope;
@@ -50,6 +51,18 @@ public function testReceivesMessages()
5051
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
5152
}
5253

54+
public function testKeepalive()
55+
{
56+
$transport = $this->getTransport(
57+
null,
58+
$connection = $this->createMock(Connection::class),
59+
);
60+
61+
$connection->expects($this->once())->method('keepalive')->with(1);
62+
63+
$transport->keepalive(new Envelope(new DummyMessage('foo'), [new BeanstalkdReceivedStamp(1, 'bar')]));
64+
}
65+
5366
private function getTransport(SerializerInterface $serializer = null, Connection $connection = null): BeanstalkdTransport
5467
{
5568
$serializer ??= $this->createMock(SerializerInterface::class);

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,4 +330,37 @@ public function testSendWhenABeanstalkdExceptionOccurs()
330330

331331
$connection->send($body, $headers, $delay);
332332
}
333+
334+
public function testKeepalive()
335+
{
336+
$id = 123456;
337+
338+
$tube = 'baz';
339+
340+
$client = $this->createMock(PheanstalkInterface::class);
341+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
342+
$client->expects($this->once())->method('touch')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id));
343+
344+
$connection = new Connection(['tube_name' => $tube], $client);
345+
346+
$connection->keepalive((string) $id);
347+
}
348+
349+
public function testKeepaliveWhenABeanstalkdExceptionOccurs()
350+
{
351+
$id = 123456;
352+
353+
$tube = 'baz123';
354+
355+
$exception = new ServerException('baz error');
356+
357+
$client = $this->createMock(PheanstalkInterface::class);
358+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
359+
$client->expects($this->once())->method('touch')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willThrowException($exception);
360+
361+
$connection = new Connection(['tube_name' => $tube], $client);
362+
363+
$this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception));
364+
$connection->keepalive((string) $id);
365+
}
333366
}

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@
1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Exception\LogicException;
1616
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
17+
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1718
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
18-
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
1919
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2020
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2121

2222
/**
2323
* @author Antonio Pauletich <antonio.pauletich95@gmail.com>
2424
*/
25-
class BeanstalkdReceiver implements ReceiverInterface, MessageCountAwareInterface
25+
class BeanstalkdReceiver implements KeepaliveReceiverInterface, MessageCountAwareInterface
2626
{
2727
private Connection $connection;
2828
private SerializerInterface $serializer;
@@ -65,6 +65,11 @@ public function reject(Envelope $envelope): void
6565
$this->connection->reject($this->findBeanstalkdReceivedStamp($envelope)->getId());
6666
}
6767

68+
public function keepalive(Envelope $envelope): void
69+
{
70+
$this->connection->keepalive($this->findBeanstalkdReceivedStamp($envelope)->getId());
71+
}
72+
6873
public function getMessageCount(): int
6974
{
7075
return $this->connection->getMessageCount();

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdTransport.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Bridge\Beanstalkd\Transport;
1313

1414
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1516
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1617
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1718
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -20,7 +21,7 @@
2021
/**
2122
* @author Antonio Pauletich <antonio.pauletich95@gmail.com>
2223
*/
23-
class BeanstalkdTransport implements TransportInterface, MessageCountAwareInterface
24+
class BeanstalkdTransport implements TransportInterface, KeepaliveReceiverInterface, MessageCountAwareInterface
2425
{
2526
private Connection $connection;
2627
private SerializerInterface $serializer;
@@ -48,6 +49,11 @@ public function reject(Envelope $envelope): void
4849
$this->getReceiver()->reject($envelope);
4950
}
5051

52+
public function keepalive(Envelope $envelope): void
53+
{
54+
$this->getReceiver()->keepalive($envelope);
55+
}
56+
5157
public function getMessageCount(): int
5258
{
5359
return $this->getReceiver()->getMessageCount();

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,15 @@ public function reject(string $id): void
179179
}
180180
}
181181

182+
public function keepalive(string $id): void
183+
{
184+
try {
185+
$this->client->useTube($this->tube)->touch(new JobId((int) $id));
186+
} catch (Exception $exception) {
187+
throw new TransportException($exception->getMessage(), 0, $exception);
188+
}
189+
}
190+
182191
public function getMessageCount(): int
183192
{
184193
try {

src/Symfony/Component/Messenger/Bridge/Beanstalkd/composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"require": {
1515
"php": ">=8.2",
1616
"pda/pheanstalk": "^4.0",
17-
"symfony/messenger": "^6.4|^7.0"
17+
"symfony/messenger": "^7.1"
1818
},
1919
"require-dev": {
2020
"symfony/property-access": "^6.4|^7.0",

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ CHANGELOG
88
* Add `--all` option to the `messenger:consume` command
99
* Make `#[AsMessageHandler]` final
1010
* Add parameter `$jitter` to `MultiplierRetryStrategy` in order to randomize delay and prevent the thundering herd effect
11+
* Add the ability to asynchronously notify transports about which messages are still being processed by the handler, using `pcntl_alarm()`
1112

1213
7.0
1314
---
< 10000 div class="d-flex flex-row">

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Psr\Container\ContainerInterface;
1515
use Psr\Log\LoggerInterface;
1616
use Symfony\Component\Console\Attribute\AsCommand;
17+
use Symfony\Component\Console\Command\AlarmableCommandInterface;
1718
use Symfony\Component\Console\Command\Command;
1819
use Symfony\Component\Console\Command\SignalableCommandInterface;
1920
use Symfony\Component\Console\Completion\CompletionInput;
@@ -41,8 +42,10 @@
4142
* @author Samuel Roze <samuel.roze@gmail.com>
4243
*/
4344
#[AsCommand(name: 'messenger:consume', description: 'Consume messages')]
44-
class ConsumeMessagesCommand extends Command implements SignalableCommandInterface
45+
class ConsumeMessagesCommand extends Command implements SignalableCommandInterface, AlarmableCommandInterface
4546
{
47+
private const DEFAULT_KEEPALIVE_INTERVAL = 5;
48+
4649
private RoutableMessageBus $routableBus;
4750
private ContainerInterface $receiverLocator;
4851
private EventDispatcherInterface $eventDispatcher;
@@ -53,6 +56,7 @@ class ConsumeMessagesCommand extends Command implements SignalableCommandInterfa
5356
private ?ContainerInterface $rateLimiterLocator;
5457
private ?array $signals;
5558
private ?Worker $worker = null;
59+
private ?int $keepaliveInterval = null;
5660

5761
public function __construct(RoutableMessageBus $routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, array $receiverNames = [], ResetServicesListener $resetServicesListener = null, array $busIds = [], ContainerInterface $rateLimiterLocator = null, array $signals = null)
5862
{
@@ -85,6 +89,7 @@ protected function configure(): void
8589
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
8690
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
8791
new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'),
92+
new InputOption('keepalive', null, InputOption::VALUE_OPTIONAL, 'Whether to use the transport\'s keepalive mechanism if implemented', self::DEFAULT_KEEPALIVE_INTERVAL),
8893
])
8994
->setHelp(<<<'EOF'
9095
The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
@@ -294,6 +299,25 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int|
294299
return false;
295300
}
296301

302+
public function getAlarmTime(InputInterface $input, OutputInterface $output): int
303+
{
304+
return $this->keepaliveInterval ??= $input->hasParameterOption('--keepalive')
305+
? (int) ($input->getOption('keepalive') ?? self::DEFAULT_KEEPALIVE_INTERVAL) : 0;
306+
}
307+
308+
public function handleAlarm(false|int $previousExitCode = 0): int|false
309+
{
310+
if (!$this->worker) {
311+
return false;
312+
}
313+
314+
$this->logger?->info('Sending keepalive request.', ['transport_names' => $this->worker->getMetadata()->getTransportNames()]);
315+
316+
$this->worker->keepalive();
317+
318+
return false;
319+
}
320+
297321
private function convertToBytes(string $memoryLimit): int
298322
{
299323
$memoryLimit = strtolower($memoryLimit);

src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use Psr\Log\LoggerInterface;
1515
use Symfony\Component\Console\Attribute\AsCommand;
16+
use Symfony\Component\Console\Command\AlarmableCommandInterface;
1617
use Symfony\Component\Console\Command\SignalableCommandInterface;
1718
use Symfony\Component\Console\Exception\RuntimeException;
1819
use Symfony\Component\Console\Input\InputArgument;
@@ -37,15 +38,18 @@
3738
* @author Ryan Weaver <ryan@symfonycasts.com>
3839
*/
3940
#[AsCommand(name: 'messenger:failed:retry', description: 'Retry one or more messages from the failure transport')]
40-
class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand implements SignalableCommandInterface
41+
class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand implements SignalableCommandInterface, AlarmableCommandInterface
4142
{
43+
private const DEFAULT_KEEPALIVE_INTERVAL = 5;
44+
4245
private EventDispatcherInterface $eventDispatcher;
4346
private MessageBusInterface $messageBus;
4447
private ?LoggerInterface $logger;
4548
private ?array $signals;
4649
private bool $shouldStop = false;
4750
private bool $forceExit = false;
4851
private ?Worker $worker = null;
52+
private ?int $keepaliveInterval = null;
4953

5054
public function __construct(?string $globalReceiverName, ServiceProviderInterface $failureTransports, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, PhpSerializer $phpSerializer = null, array $signals = null)
5155
{
@@ -64,6 +68,7 @@ protected function configure(): void
6468
new InputArgument('id', InputArgument::IS_ARRAY, 'Specific message id(s) to retry'),
6569
new InputOption('force', null, InputOption::VALUE_NONE, 'Force action without confirmation'),
6670
new InputOption('transport', null, InputOption::VALUE_OPTIONAL, 'Use a specific failure transport', self::DEFAULT_TRANSPORT_OPTION),
71+
new InputOption('keepalive', null, InputOption::VALUE_OPTIONAL, 'Whether to use the transport\'s keepalive mechanism if implemented', self::DEFAULT_KEEPALIVE_INTERVAL),
6772
])
6873
->setHelp(<<<'EOF'
6974
The <info>%command.name%</info> retries message in the failure transport.
@@ -155,6 +160,25 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int|
155160
return $this->forceExit ? 0 : false;
156161
}
157162

163+
public function getAlarmTime(InputInterface $input, OutputInterface $output): int
164+
{
165+
return $this->keepaliveInterval ??= $input->hasParameterOption('--keepalive')
166+
? (int) ($input->getOption('keepalive') ?? self::DEFAULT_KEEPALIVE_INTERVAL) : 0;
167+
}
168+
169+
public function handleAlarm(false|int $previousExitCode = 0): int|false
170+
{
171+
if (!$this->worker) {
172+
return false;
173+
}
174+
175+
$this->logger?->info('Sending keepalive request.', ['transport_names' => $this->worker->getMetadata()->getTransportNames()]);
176+
177+
$this->worker->keepalive();
178+
179+
return false;
180+
}
181+
158182
private function runInteractive(string $failureTransportName, SymfonyStyle $io, bool $shouldForce): void
159183
{
160184
$receiver = $this->failureTransports->get($failureTransportName);

0 commit comments

Comments
 (0)
0