8000 [Messenger] Notify transports which messages are still being processe… · symfony/symfony@da22530 · GitHub
[go: up one dir, main page]

Skip to content

Commit da22530

Browse files
committed
[Messenger] Notify transports which messages are still being processed, using pcntl_alarm()
1 parent 16b0fd9 commit da22530

File tree

15 files changed

+275
-8
lines changed

15 files changed

+275
-8
lines changed

src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
7.2
5+
---
6+
7+
* Implement the `KeepaliveReceiverInterface` to enable asynchronously notifying Beanstalkd that the job is still being processed, in order to avoid timeouts
8+
49
5.2.0
510
-----
611

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp;
1717
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceiver;
1818
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
19+
use Symfony\Component\Messenger\Envelope;
1920
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
2021
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2122
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
@@ -78,6 +79,17 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
7879
$receiver->get();
7980
}
8081

82+
public function testKeepalive()
83+
{
84+
$serializer = $this->createSerializer();
85+
86+
$connection = $this->createMock(Connection::class);
87+
$connection->expects($this->once())->method('keepalive')->with(1);
88+
89+
$receiver = new BeanstalkdReceiver($connection, $serializer);
90+
$receiver->keepalive(new Envelope(new DummyMessage('foo'), [new BeanstalkdReceivedStamp(1, 'bar')]));
91+
}
92+
8193
private function createBeanstalkdEnvelope(): array
8294
{
8395
return [

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdTransportTest.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Bridge\Beanstalkd\Tests\Fixtures\DummyMessage;
16+
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp;
1617
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdTransport;
1718
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
1819
use Symfony\Component\Messenger\Envelope;
@@ -50,6 +51,18 @@ public function testReceivesMessages()
5051
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
5152
}
5253

54+
public function testKeepalive()
55+
{
56+
$transport = $this->getTransport(
57+
null,
58+
$connection = $this->createMock(Connection::class),
59+
);
60+
61+
$connection->expects($this->once())->method('keepalive')->with(1);
62+
63+
$transport->keepalive(new Envelope(new DummyMessage('foo'), [new BeanstalkdReceivedStamp(1, 'bar')]));
64+
}
65+
5366
private function getTransport(?SerializerInterface $serializer = null, ?Connection $connection = null): BeanstalkdTransport
5467
{
5568
$serializer ??= $this->createMock(SerializerInterface::class);

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,4 +330,37 @@ public function testSendWhenABeanstalkdExceptionOccurs()
330330

331331
$connection->send($body, $headers, $delay);
332332
}
333+
334+
public function testKeepalive()
335+
{
336+
$id = 123456;
337+
338+
$tube = 'baz';
339+
340+
$client = $this->createMock(PheanstalkInterface::class);
341+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
342+
$client->expects($this->once())->method('touch')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id));
343+
344+
$connection = new Connection(['tube_name' => $tube], $client);
345+
346+
$connection->keepalive((string) $id);
347+
}
348+
349+
public function testKeepaliveWhenABeanstalkdExceptionOccurs()
350+
{
351+
$id = 123456;
352+
353+
$tube = 'baz123';
354+
355+
$exception = new ServerException('baz error');
356+
357+
$client = $this->createMock(PheanstalkInterface::class);
358+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
359+
$client->expects($this->once())->method('touch')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willThrowException($exception);
360+
361+
$connection = new Connection(['tube_name' => $tube], $client);
362+
363+
$this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception));
364+
$connection->keepalive((string) $id);
365+
}
333366
}

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@
1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Exception\LogicException;
1616
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
17+
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1718
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
18-
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
1919
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2020
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2121

2222
/**
2323
* @author Antonio Pauletich <antonio.pauletich95@gmail.com>
2424
*/
25-
class BeanstalkdReceiver implements ReceiverInterface, MessageCountAwareInterface
25+
class BeanstalkdReceiver implements KeepaliveReceiverInterface, MessageCountAwareInterface
2626
{
2727
private SerializerInterface $serializer;
2828

@@ -65,6 +65,11 @@ public function reject(Envelope $envelope): void
6565
$this->connection->reject($this->findBeanstalkdReceivedStamp($envelope)->getId());
6666
}
6767

68+
public function keepalive(Envelope $envelope): void
69+
{
70+
$this->connection->keepalive($this->findBeanstalkdReceivedStamp($envelope)->getId());
71+
}
72+
6873
public function getMessageCount(): int
6974
{
7075
return $this->connection->getMessageCount();

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdTransport.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Bridge\Beanstalkd\Transport;
1313

1414
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1516
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1617
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1718
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -20,7 +21,7 @@
2021
/**
2122
* @author Antonio Pauletich <antonio.pauletich95@gmail.com>
2223
*/
23-
class BeanstalkdTransport implements TransportInterface, MessageCountAwareInterface
24+
class BeanstalkdTransport implements TransportInterface, KeepaliveReceiverInterface, MessageCountAwareInterface
2425
{
2526
private SerializerInterface $serializer;
2627
private BeanstalkdReceiver $receiver;
@@ -48,6 +49,11 @@ public function reject(Envelope $envelope): void
4849
$this->getReceiver()->reject($envelope);
4950
}
5051

52+
public function keepalive(Envelope $envelope): void
53+
{
54+
$this->getReceiver()->keepalive($envelope);
55+
}
56+
5157
public function getMessageCount(): int
5258
{
5359
return $this->getReceiver()->getMessageCount();

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,15 @@ public function reject(string $id): void
180180
}
181181
}
182182

183+
public function keepalive(string $id): void
184+
{
185+
try {
186+
$this->client->useTube($this->tube)->touch(new JobId((int) $id));
187+
} catch (Exception $exception) {
188+
throw new TransportException($exception->getMessage(), 0, $exception);
189+
}
190+
}
191+
183192
public function getMessageCount(): int
184193
{
185194
try {

src/Symfony/Component/Messenger/Bridge/Beanstalkd/composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"require": {
1515
"php": ">=8.2",
1616
"pda/pheanstalk": "^4.0",
17-
"symfony/messenger": "^6.4|^7.0"
17+
"symfony/messenger": "^7.2"
1818
},
1919
"require-dev": {
2020
"symfony/property-access": "^6.4|^7.0",

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ CHANGELOG
77
* `WrappedExceptionsInterface` now extends PHP's `Throwable` interface
88
* Add `#[AsMessage]` attribute with `$transport` parameter for message routing
99
* Add `--format` option to the `messenger:stats` command
10+
* Add the ability to asynchronously notify transports about which messages are still being processed by the handler, using `pcntl_alarm()`
1011

1112
7.1
1213
---

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Psr\Container\ContainerInterface;
1515
use Psr\Log\LoggerInterface;
1616
use Symfony\Component\Console\Attribute\AsCommand;
17+
use Symfony\Component\Console\Command\AlarmableCommandInterface;
1718
use Symfony\Component\Console\Command\Command;
1819
use Symfony\Component\Console\Command\SignalableCommandInterface;
1920
use Symfony\Component\Console\Completion\CompletionInput;
@@ -41,9 +42,12 @@
4142
* @author Samuel Roze <samuel.roze@gmail.com>
4243
*/
4344
#[AsCommand(name: 'messenger:consume', description: 'Consume messages')]
44-
class ConsumeMessagesCommand extends Command implements SignalableCommandInterface
45+
class ConsumeMessagesCommand extends Command implements SignalableCommandInterface, AlarmableCommandInterface
4546
{
47+
private const DEFAULT_KEEPALIVE_INTERVAL = 5;
48+
4649
private ?Worker $worker = null;
50+
private ?int $keepaliveInterval = null;
4751

4852
public function __construct(
4953
private RoutableMessageBus $routableBus,
@@ -75,6 +79,7 @@ protected function configure(): void
7579
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
7680
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
7781
new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'),
82+
new InputOption('keepalive', null, InputOption::VALUE_OPTIONAL, 'Whether to use the transport\'s keepalive mechanism if implemented', self::DEFAULT_KEEPALIVE_INTERVAL),
7883
])
7984
->setHelp(<<<'EOF'
8085
The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
@@ -280,6 +285,25 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int|
280285
return false;
281286
}
282287

288+
public function getAlarmInterval(InputInterface $input): int
289+
{
290+
return $this->keepaliveInterval ??= $input->hasParameterOption('--keepalive')
291+
? (int) ($input->getOption('keepalive') ?? self::DEFAULT_KEEPALIVE_INTERVAL) : 0;
292+
}
293+
294+
public function handleAlarm(false|int $previousExitCode = 0): int|false
295+
{
296+
if (!$this->worker) {
297+
return false;
298+
}
299+
300+
$this->logger?->info('Sending keepalive request.', ['transport_names' => $this->worker->getMetadata()->getTransportNames()]);
301+
302+
$this->worker->keepalive();
303+
304+
return false;
305+
}
306+
283307
private function convertToBytes(string $memoryLimit): int
284308
{
285309
$memoryLimit = strtolower($memoryLimit);

src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use Psr\Log\LoggerInterface;
1515
use Symfony\Component\Console\Attribute\AsCommand;
16+
use Symfony\Component\Console\Command\AlarmableCommandInterface;
1617
use Symfony\Component\Console\Command\SignalableCommandInterface;
1718
use Symfony\Component\Console\Exception\RuntimeException;
1819
use Symfony\Component\Console\Input\InputArgument;
@@ -37,11 +38,14 @@
3738
* @author Ryan Weaver <ryan@symfonycasts.com>
3839
*/
3940
#[AsCommand(name: 'messenger:failed:retry', description: 'Retry one or more messages from the failure transport')]
40-
class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand implements SignalableCommandInterface
41+
class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand implements SignalableCommandInterface, AlarmableCommandInterface
4142
{
43+
private const DEFAULT_KEEPALIVE_INTERVAL = 5;
44+
4245
private bool $shouldStop = false;
4346
private bool $forceExit = false;
4447
private ?Worker $worker = null;
48+
private ?int $keepaliveInterval = null;
4549

4650
public function __construct(
4751
?string $globalReceiverName,
@@ -62,6 +66,7 @@ protected function configure(): void
6266
new InputArgument('id', InputArgument::IS_ARRAY, 'Specific message id(s) to retry'),
6367
new InputOption('force', null, InputOption::VALUE_NONE, 'Force action without confirmation'),
6468
new InputOption('transport', null, InputOption::VALUE_OPTIONAL, 'Use a specific failure transport', self::DEFAULT_TRANSPORT_OPTION),
69+
new InputOption('keepalive', null, InputOption::VALUE_OPTIONAL, 'Whether to use the transport\'s keepalive mechanism if implemented', self::DEFAULT_KEEPALIVE_INTERVAL),
6570
])
6671
->setHelp(<<<'EOF'
6772
The <info>%command.name%</info> retries message in the failure transport.
@@ -149,6 +154,25 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int|
149154
return $this->forceExit ? 0 : false;
150155
}
151156

157+
public function getAlarmInterval(InputInterface $input): int
158+
{
159+
return $this->keepaliveInterval ??= $input->hasParameterOption('--keepalive')
160+
? (int) ($input->getOption('keepalive') ?? self::DEFAULT_KEEPALIVE_INTERVAL) : 0;
161+
}
162+
163+
public function handleAlarm(false|int $previousExitCode = 0): int|false
164+
{
165+
if (!$this->worker) {
166+
return false;
167+
}
168+
169+
$this->logger?->info('Sending keepalive request.', ['transport_names' => $this->worker->getMetadata()->getTransportNames()]);
170+
171+
$this->worker->keepalive();
172+
173+
return false;
174+
}
175+
152176
private function runInteractive(string $failureTransportName, SymfonyStyle $io, bool $shouldForce): void
153177
{
154178
$receiver = $this->failureTransports->get($failureTransportName);

0 commit comments

Comments
 (0)
0