Description
Symfony version(s) affected
6.3 6.4
Description
The message generator appears to store stateful data by loading and saving the checkpoint with the cache.
But if the worker is configured to stop after X message(s), once the last message is handled, the (messenger) worker will break its loop and stop. In this case, the execution stop and the stateful data of the message (time of last execution for example) will not be saved.
So when the consumer restarts it will re-handle the message because the last execution was not stored.
This bug appears to break the stateful behavior 🤔
MessageGenerator.php
public function getMessages(): \Generator
{
$checkpoint = $this->checkpoint();
// ...
while (!$heap->isEmpty() && $heap->top()[0] <= $now) {
// ...
if ($yield) {
$context = new MessageContext($this->name, $id, $trigger, $time, $nextTime);
foreach ($recurringMessage->getMessages($context) as $message) {
yield $context => $message; // Message is handled here
}
$checkpoint->save($time, $index); // Checkpoint is saved here, /!\ not reached if worker has stopped !
}
}
// ...
}
How to reproduce
Create a simple stateful schedule provider like below :
#[AsSchedule]
readonly class ScheduleProvider implements ScheduleProviderInterface
{
public function __construct(
private CacheInterface $cache,
private LockFactory $lockFactory
) {
}
public function getSchedule(): Schedule
{
$schedule = (new Schedule())
->stateful($this->cache)
->lock($this->lockFactory->createLock('surface_scheduler'));
$schedule->add(RecurringMessage::every('10 seconds', new TestMessage(data: 'Cron', duration: 1)));
return $schedule;
}
}
Run the following command :
console messenger:consume scheduler_default --limit 1
The worker will wait 10 seconds, handle the message and stop.
Just after that, rerun the command, and you will see that the message will immediately be re-executed.
Exemple
$ console messenger:consume scheduler_default --limit 1
[OK] Consuming messages from transport "scheduler_default".
6B4B
// The worker will automatically exit once it has processed 1 messages or received a stop signal via the
// messenger:stop-workers command.
Message received: Cron at 2023-10-25 11:06:00 // First execution -> OK
$ console messenger:consume scheduler_default --limit 1
[OK] Consuming messages from transport "scheduler_default".
// The worker will automatically exit once it has processed 1 messages or received a stop signal via the
// messenger:stop-workers command.
Message received: Cron at 2023-10-25 11:06:03 // Second execution -> KO, did not wait 10 seconds since last time
$ console messenger:consume scheduler_default --limit 1
[OK] Consuming messages from transport "scheduler_default".
// The worker will automatically exit once it has processed 1 messages or received a stop signal via the
// messenger:stop-workers command.
Message received: Cron at 2023-10-25 11:06:06 // Third execution -> KO, idem
Possible Solution
The dumbest solution would be to move the break
in the Messenger Worker.php
like below.
But I'm not sure if it would have any side effects.
foreach ($envelopes as $envelope) {
+ if ($this->shouldStop) {
+ break 2;
+ }
$envelopeHandled = true;
$this->rateLimit($transportName);
$this->handleMessage($envelope, $transportName);
$this->eventDispatcher?->dispatch(new WorkerRunningEvent($this, false));
- if ($this->shouldStop) {
- break 2;
- }
}
Additional Context
No response