8000 Fix stateful scheduler · symfony/symfony@2d5856b · GitHub
[go: up one dir, main page]

Skip to content

Commit 2d5856b

Browse files
committed
Fix stateful scheduler
1 parent 7c833ee commit 2d5856b

10 files changed

+75
-26
lines changed

src/Symfony/Component/Scheduler/CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ CHANGELOG
1010
* Add `AbstractTriggerDecorator`
1111
* Make `ScheduledStamp` "send-able"
1212
* Add `ScheduledStamp` to `RedispatchMessage`
13+
* Add `from()` to `CheckpointInterface`
1314

1415
6.3
1516
---

src/Symfony/Component/Scheduler/Generator/Checkpoint.php

+13-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
final class Checkpoint implements CheckpointInterface
1818
{
19+
private \DateTimeImmutable $from;
1920
private \DateTimeImmutable $time;
2021
private int $index = -1;
2122
private bool $reset = false;
@@ -41,14 +42,22 @@ public function acquire(\DateTimeImmutable $now): bool
4142
$this->save($now, -1);
4243
}
4344

44-
$this->time ??= $now;
4545
if ($this->cache) {
46-
$this->save(...$this->cache->get($this->name, fn () => [$now, -1]));
46+
[$this->time, $this->index, $this->from] = $this->cache->get($this->name, fn () => [$now, -1, $now]) + [2 => $now];
47+
$this->save($this->time, $this->index);
4748
}
4849

50+
$this->time ??= $now;
51+
$this->from ??= $now;
52+
4953
return true;
5054
}
5155

56+
public function from(): \DateTimeImmutable
57+
{
58+
return $this->from;
59+
}
60+
5261
public function time(): \DateTimeImmutable
5362
{
5463
return $this->time;
@@ -63,7 +72,8 @@ public function save(\DateTimeImmutable $time, int $index): void
6372
{
6473
$this->time = $time;
6574
$this->index = $index;
66-
$this->cache< F438 /span>?->get($this->name, fn () => [$time, $index], \INF);
75+
$this->from ??= $time;
76+
$this->cache?->get($this->name, fn () => [$time, $index, $this->from], \INF);
6777
}
6878

6979
/**

src/Symfony/Component/Scheduler/Generator/CheckpointInterface.php

+2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ interface CheckpointInterface
1515
{
1616
public function acquire(\DateTimeImmutable $now): bool;
1717

18+
public function from(): \DateTimeImmutable;
19+
1820
public function time(): \DateTimeImmutable;
1921

2022
public function index(): int;

src/Symfony/Component/Scheduler/Generator/MessageGenerator.php

+11-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use Symfony\Component\Scheduler\RecurringMessage;
1717
use Symfony\Component\Scheduler\Schedule;
1818
use Symfony\Component\Scheduler\ScheduleProviderInterface;
19+
use Symfony\Component\Scheduler\Trigger\StatefulTriggerInterface;
1920

2021
final class MessageGenerator implements MessageGeneratorInterface
2122
{
@@ -43,9 +44,10 @@ public function getMessages(): \Generator
4344
return;
4445
}
4546

47+
$startTime = $checkpoint->from();
4648
$lastTime = $checkpoint->time();
4749
$lastIndex = $checkpoint->index();
48-
$heap = $this->heap($lastTime);
50+
$heap = $this->heap($lastTime, $startTime);
4951

5052
while (!$heap->isEmpty() && $heap->top()[0] <= $now) {
5153
/** @var \DateTimeImmutable $time */
@@ -79,7 +81,7 @@ public function getMessages(): \Generator
7981
$checkpoint->release($now, $this->waitUntil);
8082
}
8183

82-
private function heap(\DateTimeImmutable $time): TriggerHeap
84+
private function heap(\DateTimeImmutable $time, \DateTimeImmutable $startTime): TriggerHeap
8385
{
8486
if (isset($this->triggerHeap) && $this->triggerHeap->time <= $time) {
8587
return $this->triggerHeap;
@@ -88,7 +90,13 @@ private function heap(\DateTimeImmutable $time): TriggerHeap
8890
$heap = new TriggerHeap($time);
8991

9092
foreach ($this->schedule()->getRecurringMessages() as $index => $recurringMessage) {
91-
if (!$nextTime = $recurringMessage->getTrigger()->getNextRunDate($time)) {
93+
$trigger = $recurringMessage->getTrigger();
94+
95+
if ($trigger instanceof StatefulTriggerInterface) {
96+
$trigger->continue($startTime);
97+
}
98+
99+
if (!$nextTime = $trigger->getNextRunDate($time)) {
92100
continue;
93101
}
94102

src/Symfony/Component/Scheduler/RecurringMessage.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ private function __construct(
4040
* @see https://en.wikipedia.org/wiki/ISO_8601#Durations
4141
* @see https://php.net/datetime.formats.relative
4242
*/
43-
public static function every(string|int|\DateInterval $frequency, object $message, string|\DateTimeImmutable $from = new \DateTimeImmutable(), string|\DateTimeImmutable $until = new \DateTimeImmutable('3000-01-01')): self
43+
public static function every(string|int|\DateInterval $frequency, object $message, string|\DateTimeImmutable|null $from = null, string|\DateTimeImmutable $until = new \DateTimeImmutable('3000-01-01')): self
4444
{
4545
return new self(new PeriodicalTrigger($frequency, $from, $until), $message);
4646
}

src/Symfony/Component/Scheduler/Tests/Generator/CheckpointTest.php

+14-10
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public function testWithStateInitStateOnFirstAcquiring()
4848
$this->assertTrue($checkpoint->acquire($now));
4949
$this->assertEquals($now, $checkpoint->time());
5050
$this->assertEquals(-1, $checkpoint->index());
51-
$this->assertEquals([$now, -1], $cache->get('cache', fn () => []));
51+
$this->assertEquals([$now, -1, $now], $cache->get('cache', fn () => []));
5252
}
5353

5454
public function testWithStateLoadStateOnAcquiring()
@@ -58,10 +58,10 @@ public function testWithStateLoadStateOnAcquiring()
5858

5959
$cache->get('cache', fn () => [$now, 0], \INF);
6060

61-
$this->assertTrue($checkpoint->acquire($now->modify('1 min')));
61+
$this->assertTrue($checkpoint->acquire($startedAt = $now->modify('1 min')));
6262
$this->assertEquals($now, $checkpoint->time());
6363
$this->assertEquals(0, $checkpoint->index());
64-
$this->assertEquals([$now, 0], $cache->get('cache', fn () => []));
64+
$this->assertEquals([$now, 0, $startedAt], $cache->get('cache', fn () => []));
6565
}
6666

6767
public function testWithLockInitStateOnFirstAcquiring()
@@ -72,11 +72,12 @@ public function testWithLockInitStateOnFirstAcquiring()
7272

7373
$this->assertTrue($checkpoint->acquire($now));
7474
$this->assertEquals($now, $checkpoint->time());
75+
$this->assertEquals($now, $checkpoint->from());
7576
$this->assertEquals(-1, $checkpoint->index());
7677
$this->assertTrue($lock->isAcquired());
7778
}
7879

79-
public function testwithLockLoadStateOnAcquiring()
80+
public function testWithLockLoadStateOnAcquiring()
8081
{
8182
$lock = new Lock(new Key('lock'), new InMemoryStore());
8283
$checkpoint = new Checkpoint('dummy', $lock);
@@ -86,6 +87,7 @@ public function testwithLockLoadStateOnAcquiring()
8687

8788
$this->assertTrue($checkpoint->acquire($now->modify('1 min')));
8889
$this->assertEquals($now, $checkpoint->time());
90+
$this->assertEquals($now, $checkpoint->from());
8991
$this->assertEquals(0, $checkpoint->index());
9092
$this->assertTrue($lock->isAcquired());
9193
}
@@ -105,12 +107,13 @@ public function testWithCacheSave()
105107
{
106108
$checkpoint = new Checkpoint('cache', new NoLock(), $cache = new ArrayAdapter());
107109
$now = new \DateTimeImmutable('2020-02-20 20:20:20Z');
108-
$checkpoint->acquire($now->modify('-1 hour'));
110+
$checkpoint->acquire($startedAt = $now->modify('-1 hour'));
109111
$checkpoint->save($now, 3);
110112

111113
$this->assertSame($now, $checkpoint->time());
112114
$this->assertSame(3, $checkpoint->index());
113-
$this->assertEquals([$now, 3], $cache->get('cache', fn () => []));
115+
$this->assertSame($startedAt, $checkpoint->from());
116+
$this->assertEquals([$now, 3, $startedAt], $cache->get('cache', fn () => []));
114117
}
115118

116119
public function testWithLockSave()
@@ -119,11 +122,12 @@ public function testWithLockSave()
119122
$checkpoint = new Checkpoint('dummy', $lock);
120123
$now = new \DateTimeImmutable('2020-02-20 20:20:20Z');
121124

122-
$checkpoint->acquire($now->modify('-1 hour'));
125+
$checkpoint->acquire($startTime = $now->modify('-1 hour'));
123126
$checkpoint->save($now, 3);
124127

125128
$this->assertSame($now, $checkpoint->time());
126129
$this->assertSame(3, $checkpoint->index());
130+
$this->assertSame($startTime, $checkpoint->from());
127131
}
128132

129133
public function testWithLockAndCacheSave()
@@ -132,12 +136,12 @@ public function testWithLockAndCacheSave()
132136
$checkpoint = new Checkpoint('dummy', $lock, $cache = new ArrayAdapter());
133137
$now = new \DateTimeImmutable('2020-02-20 20:20:20Z');
134138

135-
$checkpoint->acquire($now->modify('-1 hour'));
139+
$checkpoint->acquire($startTime = $now->modify('-1 hour'));
136140
$checkpoint->save($now, 3);
137141

138142
$this->assertSame($now, $checkpoint->time());
139143
$this->assertSame(3, $checkpoint->index());
140-
$this->assertEquals([$now, 3], $cache->get('dummy', fn () => []));
144+
$this->assertEquals([$now, 3, $startTime], $cache->get('dummy', fn () => []));
141145
}
142146

143147
public function testWithCacheFullCycle()
@@ -161,7 +165,7 @@ public function testWithCacheFullCycle()
161165
$this->assertSame(3, $lastIndex);
162166
$this->assertEquals($now, $checkpoint->time());
163167
$this->assertSame(0, $checkpoint->index());
164-
$this->assertEquals([$now, 0], $cache->get('cache', fn () => []));
168+
$this->assertEquals([$now, 0, $now], $cache->get('cache', fn () => []));
165169
}
166170

167171
public function testWithLockResetStateAfterLockedAcquiring()

src/Symfony/Component/Scheduler/Tests/Trigger/PeriodicalTriggerTest.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ public static function providerGetNextRunDateAgain(): iterable
184184
yield [
185185
$trigger,
186186
new \DateTimeImmutable('2020-02-20T01:59:00+02:00'),
187-
new \DateTimeImmutable('2020-02-20T02:09:00+02:00'),
187+
new \DateTimeImmutable('2020-02-20T02:00:00+02:00'),
188188
];
189189
yield [
190190
$trigger,

src/Symfony/Component/Scheduler/Trigger/AbstractDecoratedTrigger.php

+8-1
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,19 @@
1414
/**
1515
* @author Kevin Bond <kevinbond@gmail.com>
1616
*/
17-
abstract class AbstractDecoratedTrigger implements TriggerInterface
17+
abstract class AbstractDecoratedTrigger implements StatefulTriggerInterface
1818
{
1919
public function __construct(private TriggerInterface $inner)
2020
{
2121
}
2222

23+
public function continue(\DateTimeImmutable $startedAt): void
24+
{
25+
if ($this->inner instanceof StatefulTriggerInterface) {
26+
$this->inner->continue($startedAt);
27+
}
28+
}
29+
2330
final public function inner(): TriggerInterface
2431
{
2532
$inner = $this->inner;

src/Symfony/Component/Scheduler/Trigger/PeriodicalTrigger.php

+16-7
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,18 @@
1313

1414
use Symfony\Component\Scheduler\Exception\InvalidArgumentException;
1515

16-
class PeriodicalTrigger implements TriggerInterface
16+
class PeriodicalTrigger implements StatefulTriggerInterface
1717
{
1818
private float $intervalInSeconds = 0.0;
19-
private \DateTimeImmutable $from;
19+
private ?\DateTimeImmutable $from;
2020
private \DateTimeImmutable $until;
2121
private \DatePeriod $period;
2222
private string $description;
23+
private string|int|float|\DateInterval $interval;
2324

2425
public function __construct(
2526
string|int|float|\DateInterval $interval,
26-
string|\DateTimeImmutable $from = new \DateTimeImmutable(),
27+
string|\DateTimeImmutable|null $from = null,
2728
string|\DateTimeImmutable $until = new \DateTimeImmutable('3000-01-01'),
2829
) {
2930
$this->from = \is_string($from) ? new \DateTimeImmutable($from) : $from;
@@ -70,7 +71,7 @@ public function __construct(
7071
$this->description = sprintf('every %s seconds', $this->intervalInSeconds);
7172
}
7273
} else {
73-
$this->period = new \DatePeriod($this->from, $i, $this->until);
74+
$this->interval = $i;
7475
}
7576
} catch (\Exception $e) {
7677
throw new InvalidArgumentException(sprintf('Invalid interval "%s": ', $interval instanceof \DateInterval ? 'instance of \DateInterval' : $interval).$e->getMessage(), 0, $e);
@@ -82,15 +83,22 @@ public function __toString(): string
8283
return $this->description;
8384
}
8485

86+
public function continue(\DateTimeImmutable $startedAt): void
87+
{
88+
$this->from ??= $startedAt;
89+
}
90+
8591
public function getNextRunDate(\DateTimeImmutable $run): ?\DateTimeImmutable
8692
{
93+
$this->from ??= $run;
94+
8795
if ($this->intervalInSeconds) {
8896
if ($this->until <= $run) {
8997
return null;
9098
}
9199

92-
$fromDate = min($this->from, $run);
93-
$from = $fromDate->format('U.u');
100+
$fromDate = $this->from;
101+
$from = (float) $fromDate->format('U.u');
94102
$delta = $run->format('U.u') - $from;
95103
$recurrencesPassed = floor($delta / $this->intervalInSeconds);
96104
$nextRunTimestamp = sprintf('%.6F', ($recurrencesPassed + 1) * $this->intervalInSeconds + $from);
@@ -103,6 +111,7 @@ public function getNextRunDate(\DateTimeImmutable $run): ?\DateTimeImmutable
103111
return $this->until > $nextRun ? $nextRun : null;
104112
}
105113

114+
$this->period ??= new \DatePeriod($this->from, $this->interval, $this->until);
106115
$iterator = $this->period->getIterator();
107116
while ($run >= $next = $iterator->current()) {
108117
$iterator->next();
@@ -130,6 +139,6 @@ private function canBeConvertedToSeconds(\DateInterval $interval): bool
130139

131140
private function calcInterval(\DateInterval $interval): float
132141
{
133-
return $this->from->setTimestamp(0)->add($interval)->format('U.u');
142+
return (float) (new \DateTimeImmutable('@0'))->add($interval)->format('U.u');
134143
}
135144
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<?php
2+
3+
namespace Symfony\Component\Scheduler\Trigger;
4+
5+
interface StatefulTriggerInterface extends TriggerInterface
6+
{
7+
public function continue(\DateTimeImmutable $startedAt): void;
8+
}

0 commit comments

Comments
 (0)
0