8000 [Scheduler] Stateful checkpoint is not saved if the worker has stopped · Issue #52288 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content
[Scheduler] Stateful checkpoint is not saved if the worker has stopped #52288
Closed
@FrancoisPog

Description

@FrancoisPog

Symfony version(s) affected

6.3 6.4

Description

The message generator appears to store stateful data by loading and saving the checkpoint with the cache.

But if the worker is configured to stop after X message(s), once the last message is handled, the (messenger) worker will break its loop and stop. In this case, the execution stop and the stateful data of the message (time of last execution for example) will not be saved.

So when the consumer restarts it will re-handle the message because the last execution was not stored.
This bug appears to break the stateful behavior 🤔

MessageGenerator.php

 public function getMessages(): \Generator
    {
        $checkpoint = $this->checkpoint();
        // ... 
        while (!$heap->isEmpty() && $heap->top()[0] <= $now) {
            // ...
            if ($yield) {
                $context = new MessageContext($this->name, $id, $trigger, $time, $nextTime);
                foreach ($recurringMessage->getMessages($context) as $message) {
                    yield $context => $message; // Message is handled here
                }

                $checkpoint->save($time, $index); // Checkpoint is saved here, /!\ not reached if worker has stopped ! 
            }
        }
        // ...
    }

How to reproduce

Create a simple stateful schedule provider like below :

#[AsSchedule]
readonly class ScheduleProvider implements ScheduleProviderInterface
{
    public function __construct(
        private CacheInterface $cache,
        private LockFactory $lockFactory
    ) {
    }

    public function getSchedule(): Schedule
    {
        $schedule = (new Schedule())
            ->stateful($this->cache)
            ->lock($this->lockFactory->createLock('surface_scheduler'));

        $schedule->add(RecurringMessage::every('10 seconds', new TestMessage(data: 'Cron', duration: 1)));

        return $schedule;
    }
}

Run the following command :

console messenger:consume scheduler_default --limit 1

The worker will wait 10 seconds, handle the message and stop.

Just after that, rerun the command, and you will see that the message will immediately be re-executed.

Exemple

$ console messenger:consume scheduler_default --limit 1
                                                                                                          
[OK] Consuming messages from transport "scheduler_default".                                                            
                                                                   
6B4B
                                                    
// The worker will automatically exit once it has processed 1 messages or received a stop signal via the               
// messenger:stop-workers command.                                                                                                                    

Message received: Cron at 2023-10-25 11:06:00 // First execution -> OK


$ console messenger:consume scheduler_default --limit 1
                                                                                                   
[OK] Consuming messages from transport "scheduler_default".                                                            
                                                                                                                       
// The worker will automatically exit once it has processed 1 messages or received a stop signal via the               
// messenger:stop-workers command.                                                                                     

Message received: Cron at 2023-10-25 11:06:03 // Second execution -> KO, did not wait 10 seconds since last time


$ console messenger:consume scheduler_default --limit 1
                                                                                                                       
[OK] Consuming messages from transport "scheduler_default".                                                            
                                                                                                                       
// The worker will automatically exit once it has processed 1 messages or received a stop signal via the               
// messenger:stop-workers command.                                              

Message received: Cron at 2023-10-25 11:06:06 // Third execution -> KO, idem

Possible Solution

The dumbest solution would be to move the break in the Messenger Worker.php like below.
But I'm not sure if it would have any side effects.

foreach ($envelopes as $envelope) {
+   if ($this->shouldStop) {
+     break 2;
+   }

    $envelopeHandled = true;

    $this->rateLimit($transportName);
    $this->handleMessage($envelope, $transportName);
    $this->eventDispatcher?->dispatch(new WorkerRunningEvent($this, false));

-   if ($this->shouldStop) {
-     break 2;
-   }
}

Additional Context

No response

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      0