8000 [Messenger] Notify transports which messages are still being processe… · symfony/symfony@674be99 · GitHub
[go: up one dir, main page]

Skip to content

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 674be99

Browse files
committed
[Messenger] Notify transports which messages are still being processed, using pcntl_alarm()
1 parent e42a38a commit 674be99

File tree

15 files changed

+272
-9
lines changed

15 files changed

+272
-9
lines changed

src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
7.1
5+
---
6+
7+
* Implement the `KeepaliveReceiverInterface` to enable asynchronously notifying Beanstalkd that the job is still being processed, in order to avoid timeouts
8+
49
5.2.0
510
-----
611

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp;
1717
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceiver;
1818
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
19+
use Symfony\Component\Messenger\Envelope;
1920
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
2021
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2122
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
@@ -78,6 +79,17 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
7879
$receiver->get();
7980
}
8081

82+
public function testKeepalive()
83+
{
84+
$serializer = $this->createSerializer();
85+
86+
$connection = $this->createMock(Connection::class);
87+
$connection->expects($this->once())->method('keepalive')->with(1);
88+
89+
$receiver = new BeanstalkdReceiver($connection, $serializer);
90+
$receiver->keepalive(new Envelope(new DummyMessage('foo'), [new BeanstalkdReceivedStamp(1, 'bar')]));
91+
}
92+
8193
private function createBeanstalkdEnvelope(): array
8294
{
8395
return [

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdTransportTest.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Bridge\Beanstalkd\Tests\Fixtures\DummyMessage;
16+
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp;
1617
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdTransport;
1718
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
1819
use Symfony\Component\Messenger\Envelope;
@@ -50,6 +51,18 @@ public function testReceivesMessages()
5051
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
5152
}
5253

54+
public function testKeepalive()
55+
{
56+
$transport = $this->getTransport(
57+
null,
58+
$connection = $this->createMock(Connection::class),
59+
);
60+
61+
$connection->expects($this->once())->method('keepalive')->with(1);
62+
63+
$transport->keepalive(new Envelope(new DummyMessage('foo'), [new BeanstalkdReceivedStamp(1, 'bar')]));
64+
}
65+
5366
private function getTransport(?SerializerInterface $serializer = null, ?Connection $connection = null): BeanstalkdTransport
5467
{
5568
$serializer ??= $this->createMock(SerializerInterface::class);

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,4 +330,37 @@ public function testSendWhenABeanstalkdExceptionOccurs()
330330

331331
$connection->send($body, $headers, $delay);
332332
}
333+
334+
public function testKeepalive()
335+
{
336+
$id = 123456;
337+
338+
$tube = 'baz';
339+
340+
$client = $this->createMock(PheanstalkInterface::class);
341+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
342+
$client->expects($this->once())->method('touch')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id));
343+
344+
$connection = new Connection(['tube_name' => $tube], $client);
345+
346+
$connection->keepalive((string) $id);
347+
}
348+
349+
public function testKeepaliveWhenABeanstalkdExceptionOccurs()
350+
{
351+
$id = 123456;
352+
353+
$tube = 'baz123';
354+
355+
$exception = new ServerException('baz error');
356+
357+
$client = $this->createMock(PheanstalkInterface::class);
358+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
359+
$client->expects($this->once())->method('touch')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willThrowException($exception);
360+
361+
$connection = new Connection(['tube_name' => $tube], $client);
362+
363+
$this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception));
364+
$connection->keepalive((string) $id);
365+
}
333366
}

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@
1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Exception\LogicException;
1616
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
17+
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1718
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
18-
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
1919
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2020
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2121

2222
/**
2323
* @author Antonio Pauletich <antonio.pauletich95@gmail.com>
2424
*/
25-
class BeanstalkdReceiver implements ReceiverInterface, MessageCountAwareInterface
25+
class BeanstalkdReceiver implements KeepaliveReceiverInterface, MessageCountAwareInterface
2626
{
2727
private Connection $connection;
2828
private SerializerInterface $serializer;
@@ -65,6 +65,11 @@ public function reject(Envelope $envelope): void
6565
$this->connection->reject($this->findBeanstalkdReceivedStamp($envelope)->getId());
6666
}
6767

68+
public function keepalive(Envelope $envelope): void
69+
{
70+
$this->connection->keepalive($this->findBeanstalkdReceivedStamp($envelope)->getId());
71+
}
72+
6873
public function getMessageCount(): int
6974
{
7075
return $this->connection->getMessageCount();

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdTransport.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Bridge\Beanstalkd\Transport;
1313

1414
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1516
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1617
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1718
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -20,7 +21,7 @@
2021
/**
2122
* @author Antonio Pauletich <antonio.pauletich95@gmail.com>
2223
*/
23-
class BeanstalkdTransport implements TransportInterface, MessageCountAwareInterface
24+
class BeanstalkdTransport implements TransportInterface, KeepaliveReceiverInterface, MessageCountAwareInterface
2425
{
2526
private Connection $connection;
2627
private SerializerInterface $serializer;
@@ -48,6 +49,11 @@ public function reject(Envelope $envelope): void
4849
$this->getReceiver()->reject($envelope);
4950
}
5051

52+
public function keepalive(Envelope $envelope): void
53+
{
54+
$this->getReceiver()->keepalive($envelope);
55+
}
56+
5157
public function getMessageCount(): int
5258
{
5359
return $this->getReceiver()->getMessageCount();

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,15 @@ public function reject(string $id): void
179179
}
180180
}
181181

182+
public function keepalive(string $id): void
183+
{
184+
try {
185+
$this->client->useTube($this->tube)->touch(new JobId((int) $id));
186+
} catch (Exception $exception) {
187+
throw new TransportException($exception->getMessage(), 0, $exception);
188+
}
189+
}
190+
182191
public function getMessageCount(): int
183192
{
184193
try {

src/Symfony/Component/Messenger/Bridge/Beanstalkd/composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"require": {
1515
"php": ">=8.2",
1616
"pda/pheanstalk": "^4.0",
17-
"symfony/messenger": "^6.4|^7.0"
17+
"symfony/messenger": "^7.1"
1818
},
1919
"require-dev": {
2020
"symfony/property-access": "^6.4|^7.0",

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ CHANGELOG
88
* Add `--all` option to the `messenger:consume` command
99
* Make `#[AsMessageHandler]` final
1010
* Add parameter `$jitter` to `MultiplierRetryStrategy` in order to randomize delay and prevent the thundering herd effect
11+
* Add the ability to asynchronously notify transports about which messages are still being processed by the handler, using `pcntl_alarm()`
1112

1213
7.0
1314
---

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Psr\Container\ContainerInterface;
1515
use Psr\Log\LoggerInterface;
1616
use Symfony\Component\Console\Attribute\AsCommand;
17+
use Symfony\Component\Console\Command\AlarmableCommandInterface;
1718
use Symfony\Component\Console\Command\Command;
1819
use Symfony\Component\Console\Command\SignalableCommandInterface;
1920
use Symfony\Component\Console\Completion\CompletionInput;
@@ -41,8 +42,10 @@
4142
* @author Samuel Roze <samuel.roze@gmail.com>
4243
*/
4344
#[AsCommand(name: 'messenger:consume', description: 'Consume messages')]
44-
class ConsumeMessagesCommand extends Command implements SignalableCommandInterface
45+
class ConsumeMessagesCommand extends Command implements SignalableCommandInterface, AlarmableCommandInterface
4546
{
47+
private const DEFAULT_KEEPALIVE_INTERVAL = 5;
48+
4649
private RoutableMessageBus $routableBus;
4750
private ContainerInterface $receiverLocator;
4851
private EventDispatcherInterface $eventDispatcher;
@@ -53,6 +56,7 @@ class ConsumeMessagesCommand extends Command implements SignalableCommandInterfa
5356
private ?ContainerInterface $rateLimiterLocator;
5457
private ?array $signals;
5558
private ?Worker $worker = null;
59+
private ?int $keepaliveInterval = null;
5660

5761
public function __construct(RoutableMessageBus $routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, ?LoggerInterface $logger = null, array $receiverNames = [], ?ResetServicesListener $resetServicesListener = null, array $busIds = [], ?ContainerInterface $rateLimiterLocator = null, ?array $signals = null)
5862
{
@@ -85,6 +89,7 @@ protected function configure(): void
8589
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
8690
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
8791
new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'),
92+
new InputOption('keepalive', null, InputOption::VALUE_OPTIONAL, 'Whether to use the transport\'s keepalive mechanism if implemented', self::DEFAULT_KEEPALIVE_INTERVAL),
8893
])
8994
->setHelp(<<<'EOF'
9095
The <info>%command.name%</info> command consumes messages and dispatches t 10000 hem to the message bus.
@@ -290,6 +295,25 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int|
290295
return false;
291296
}
292297

298+
public function getAlarmTime(InputInterface $input): int
299+
{
300+
return $this->keepaliveInterval ??= $input->hasParameterOption('--keepalive')
301+
? (int) ($input->getOption('keepalive') ?? self::DEFAULT_KEEPALIVE_INTERVAL) : 0;
302+
}
303+
304+
public function handleAlarm(false|int $previousExitCode = 0): int|false
305+
{
306+
if (!$this->worker) {
307+
return false;
308+
}
309+
310+
$this->logger?->info('Sending keepalive request.', ['transport_names' => $this->worker->getMetadata()->getTransportNames()]);
311+
312+
$this->worker->keepalive();
313+
314+
return false;
315+
}
316+
293317
private function convertToBytes(string $memoryLimit): int
294318
{
295319
$memoryLimit = strtolower($memoryLimit);

src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use Psr\Log\LoggerInterface;
1515
use Symfony\Component\Console\Attribute\AsCommand;
16+
use Symfony\Component\Console\Command\AlarmableCommandInterface;
1617
use Symfony\Component\Console\Command\SignalableCommandInterface;
1718
use Symfony\Component\Console\Exception\RuntimeException;
1819
use Symfony\Component\Console\Input\InputArgument;
@@ -37,15 +38,18 @@
3738
* @author Ryan Weaver <ryan@symfonycasts.com>
3839
*/
3940
#[AsCommand(name: 'messenger:failed:retry', description: 'Retry one or more messages from the failure transport')]
40-
class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand implements SignalableCommandInterface
41+
class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand implements SignalableCommandInterface, AlarmableCommandInterface
4142
{
43+
private const DEFAULT_KEEPALIVE_INTERVAL = 5;
44+
4245
private EventDispatcherInterface $eventDispatcher;
4346
private MessageBusInterface $messageBus;
4447
private ?LoggerInterface $logger;
4548
private ?array $signals;
4649
private bool $shouldStop = false;
4750
private bool $forceExit = false;
4851
private ?Worker $worker = null;
52+
private ?int $keepaliveInterval = null;
4953

5054
public function __construct(?string $globalReceiverName, ServiceProviderInterface $failureTransports, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, ?LoggerInterface $logger = null, ?PhpSerializer $phpSerializer = null, ?array $signals = null)
5155
{
@@ -64,6 +68,7 @@ protected function configure(): void
6468
new InputArgument('id', InputArgument::IS_ARRAY, 'Specific message id(s) to retry'),
6569
new InputOption('force', null, InputOption::VALUE_NONE, 'Force action without confirmation'),
6670
new InputOption('transport', null, InputOption::VALUE_OPTIONAL, 'Use a specific failure transport', self::DEFAULT_TRANSPORT_OPTION),
71+
new InputOption('keepalive', null, InputOption::VALUE_OPTIONAL, 'Whether to use the transport\'s keepalive mechanism if implemented', self::DEFAULT_KEEPALIVE_INTERVAL),
6772
])
6873
->setHelp(<<<'EOF'
6974
The <info>%command.name%</info> retries message in the failure transport.
@@ -151,6 +156,25 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int|
151156
return $this->forceExit ? 0 : false;
152157
}
153158

159+
public function getAlarmTime(InputInterface $input): int
160+
{
161+
return $this->keepaliveInterval ??= $input->hasParameterOption('--keepalive')
162+
? (int) ($input->getOption('keepalive') ?? self::DEFAULT_KEEPALIVE_INTERVAL) : 0;
163+
}
164+
165+
public function handleAlarm(false|int $previousExitCode = 0): int|false
166+
{
167+
if (!$this->worker) {
168+
return false;
169+
}
170+
171+
$this->logger?->info('Sending keepalive request.', ['transport_names' => $this->worker->getMetadata()->getTransportNames()]);
172+
173+
$this->worker->keepalive();
174+
175+
return false;
176+
}
177+
154178
private function runInteractive(string $failureTransportName, SymfonyStyle $io, bool $shouldForce): void
155179
{
156180
$receiver = $this->failureTransports->get($failureTransportName);

0 commit comments

Comments
 (0)
0