8000 [Scheduler] add PRE_RUN and POST_RUN events · symfony/symfony@20fd21a · GitHub
[go: up one dir, main page]

Skip to content

Commit 20fd21a

Browse files
committed
[Scheduler] add PRE_RUN and POST_RUN events
1 parent c6a9dde commit 20fd21a

File tree

12 files changed

+319
-11
lines changed

12 files changed

+319
-11
lines changed

src/Symfony/Bundle/FrameworkBundle/Resources/config/scheduler.php

+7
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\DependencyInjection\Loader\Configurator;
1313

14+
use Symfony\Component\Scheduler\EventListener\DispatchSchedulerEventListener;
1415
use Symfony\Component\Scheduler\Messenger\SchedulerTransportFactory;
1516
use Symfony\Component\Scheduler\Messenger\ServiceCallMessageHandler;
1617

@@ -27,5 +28,11 @@
2728
service('clock'),
2829
])
2930
->tag('messenger.transport_factory')
31+
->set('scheduler.event_listener', DispatchSchedulerEventListener::class)
32+
->args([
33+
tagged_locator('scheduler.schedule_provider', 'name'),
34+
service('event_dispatcher'),
35+
])
36+
->tag('kernel.event_subscriber')
3037
;
3138
};

src/Symfony/Component/Scheduler/CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ CHANGELOG
1313
* Add `ScheduledStamp` to `RedispatchMessage`
1414
* Allow modifying Schedule instances at runtime
1515
* Add `MessageProviderInterface` to trigger unique messages at runtime
16+
* Add `PreRunEvent` and `PostRunEvent` events
17+
* Add `DispatchSchedulerEventListener`
1618

1719
6.3
1820
---

src/Symfony/Component/Scheduler/DependencyInjection/AddScheduleMessengerPass.php

+4
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ class AddScheduleMessengerPass implements CompilerPassInterface
3030
{
3131
public function process(ContainerBuilder $container): void
3232
{
33+
if (!$container->has('event_dispatcher')) {
34+
$container->removeDefinition('scheduler.event_listener');
35+
}
36+
3337
$receivers = [];
3438
foreach ($container->findTaggedServiceIds('messenger.receiver') as $tags) {
3539
$receivers[$tags[0]['alias']] = true;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Scheduler\Event;
13+
14+
use Symfony\Component\Scheduler\Generator\MessageContext;
15+
use Symfony\Component\Scheduler\ScheduleProviderInterface;
16+
17+
class PostRunEvent
18+
{
19+
public function __construct(
20+
private readonly ScheduleProviderInterface $schedule,
21+
private readonly MessageContext $messageContext,
22+
private readonly object $message,
23+
) {
24+
}
25+
26+
public function getMessageContext(): MessageContext
27+
{
28+
return $this->messageContext;
29+
}
30+
31+
public function getSchedule(): ScheduleProviderInterface
32+
{
33+
return $this->schedule;
34+
}
35+
36+
public function getMessage(): object
37+
{
38+
return $this->message;
39+
}
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Scheduler\Event;
13+
14+
use Symfony\Component\Scheduler\Generator\MessageContext;
15+
use Symfony\Component\Scheduler\ScheduleProviderInterface;
16+
17+
class PreRunEvent
18+
{
19+
private bool $shouldCancel = false;
20+
21+
public function __construct(
22+
private readonly ScheduleProviderInterface $schedule,
23+
private readonly MessageContext $messageContext,
24+
private readonly object $message,
25+
) {
26+
}
27+
28+
public function getMessageContext(): MessageContext
29+
{
30+
return $this->messageContext;
31+
}
32+
33+
public function getSchedule(): ScheduleProviderInterface
34+
{
35+
return $this->schedule;
36+
}
37+
38+
public function getMessage(): object
39+
{
40+
return $this->message;
41+
}
42+
43+
public function shouldCancel(bool $shouldCancel = null): bool
44+
{
45+
if (null !== $shouldCancel) {
46+
$this->shouldCancel = $shouldCancel;
47+
}
48+
49+
return $this->shouldCancel;
50+
}
51+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Scheduler\EventListener;
13+
14+
use Psr\Container\ContainerInterface;
15+
use Psr\EventDispatcher\EventDispatcherInterface;
16+
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
17+
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
18+
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
19+
use Symfony\Component\Scheduler\Event\PostRunEvent;
20+
use Symfony\Component\Scheduler\Event\PreRunEvent;
21+
use Symfony\Component\Scheduler\Messenger\ScheduledStamp;
22+
23+
class DispatchSchedulerEventListener implements EventSubscriberInterface
24+
{
25+
public function __construct(
26+
private readonly ContainerInterface $scheduleProviderLocator,
27+
private readonly EventDispatcherInterface $eventDispatcher,
28+
) {
29+
}
30+
31+
public function onMessageHandled(WorkerMessageHandledEvent $event): void
32+
{
33+
$envelope = $event->getEnvelope();
34+
if (!$scheduledStamp = $envelope->last(ScheduledStamp::class)) {
35+
return;
36+
}
37+
38+
if (!$this->scheduleProviderLocator->has($scheduledStamp->messageContext->name)) {
39+
return;
40+
}
41+
42+
$this->eventDispatcher->dispatch(new PostRunEvent($this->scheduleProviderLocator->get($scheduledStamp->messageContext->name), $scheduledStamp->messageContext, $envelope->getMessage()));
43+
}
44+
45+
public function onMessageReceived(WorkerMessageReceivedEvent $event): void
46+
{
47+
$envelope = $event->getEnvelope();
48+
49+
if (!$scheduledStamp = $envelope->last(ScheduledStamp::class)) {
50+
return;
51+
}
52+
53+
if (!$this->scheduleProviderLocator->has($scheduledStamp->messageContext->name)) {
54+
return;
55+
}
56+
57+
$preRunEvent = new PreRunEvent($this->scheduleProviderLocator->get($scheduledStamp->messageContext->name), $scheduledStamp->messageContext, $envelope->getMessage());
58+
59+
$this->eventDispatcher->dispatch($preRunEvent);
60+
61+
if ($preRunEvent->shouldCancel()) {
62+
$event->shouldHandle(false);
63+
}
64+
}
65+
66+
public static function getSubscribedEvents(): array
67+
{
68+
return [
69+
WorkerMessageReceivedEvent::class => ['onMessageReceived'],
70+
WorkerMessageHandledEvent::class => ['onMessageHandled'],
71+
];
72+
}
73+
}

src/Symfony/Component/Scheduler/Generator/MessageGenerator.php

+7-7
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ public function getMessages(): \Generator
9393
$checkpoint->release($now, $this->waitUntil);
9494
}
9595

96+
public function getSchedule(): Schedule
97+
{
98+
return $this->schedule ??= $this->scheduleProvider->getSchedule();
99+
}
100+
96101
private function heap(\DateTimeImmutable $time, \DateTimeImmutable $startTime): TriggerHeap
97102
{
98103
if (isset($this->triggerHeap) && $this->triggerHeap->time <= $time) {
@@ -101,7 +106,7 @@ private function heap(\DateTimeImmutable $time, \DateTimeImmutable $startTime):
101106

102107
$heap = new TriggerHeap($time);
103108

104-
foreach ($this->schedule()->getRecurringMessages() as $index => $recurringMessage) {
109+
foreach ($this->getSchedule()->getRecurringMessages() as $index => $recurringMessage) {
105110
$trigger = $recurringMessage->getTrigger();
106111

107112
if ($trigger instanceof StatefulTriggerInterface) {
@@ -118,13 +123,8 @@ private function heap(\DateTimeImmutable $time, \DateTimeImmutable $startTime):
118123
return $this->triggerHeap = $heap;
119124
}
120125

121-
private function schedule(): Schedule
122-
{
123-
return $this->schedule ??= $this->scheduleProvider->getSchedule();
124-
}
125-
126126
private function checkpoint(): Checkpoint
127127
{
128-
return $this->checkpoint ??= new Checkpoint('scheduler_checkpoint_'.$this->name, $this->schedule()->getLock(), $this->schedule()->getState());
128+
return $this->checkpoint ??= new Checkpoint('scheduler_checkpoint_'.$this->name, $this->getSchedule()->getLock(), $this->getSchedule()->getState());
129129
}
130130
}

src/Symfony/Component/Scheduler/Messenger/SchedulerTransport.php

+5
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,9 @@ public function send(Envelope $envelope): Envelope
5454
{
5555
throw new LogicException(sprintf('"%s" cannot send messages.', __CLASS__));
5656
}
57+
58+
public function getMessageGenerator(): MessageGeneratorInterface
59+
{
60+
return $this->messageGenerator;
61+
}
5762
}

src/Symfony/Component/Scheduler/Schedule.php

+35-2
10000
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,29 @@
1111

1212
namespace Symfony\Component\Scheduler;
1313

14+
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
1415
use Symfony\Component\Lock\LockInterface;
16+
use Symfony\Component\Scheduler\Event\PostRunEvent;
17+
use Symfony\Component\Scheduler\Event\PreRunEvent;
1518
use Symfony\Component\Scheduler\Exception\LogicException;
1619
use Symfony\Contracts\Cache\CacheInterface;
1720

1821
final class Schedule implements ScheduleProviderInterface
1922
{
23+
public function __construct(
24+
private readonly ?EventDispatcherInterface $dispatcher = null,
25+
) {
26+
}
27+
2028
/** @var array<string,RecurringMessage> */
2129
private array $messages = [];
2230
private ?LockInterface $lock = null;
2331
private ?CacheInterface $state = null;
2432
private bool $shouldRestart = false;
2533

26-
public static function with(RecurringMessage $message, RecurringMessage ...$messages): static
34+
public function with(RecurringMessage $message, RecurringMessage ...$messages): static
2735
{
28-
return static::doAdd(new self(), $message, ...$messages);
36+
return static::doAdd(new self($this->dispatcher), $message, ...$messages);
2937
}
3038

3139
/**
@@ -62,6 +70,17 @@ public function remove(RecurringMessage $message): static
6270
return $this;
6371
}
6472

73+
/**
74+
* @return $this
75+
*/
76+
public function removeById(string $id): static
77+
{
78+
unset($this->messages[$id]);
79+
$this->setRestart(true);
80+
81+
return $this;
82+
}
83+
6584
/**
6685
* @return $this
6786
*/
@@ -119,6 +138,20 @@ public function getSchedule(): static
119138
return $this;
120139
}
121140

141+
public function before(callable $listener, int $priority = 0): static
142+
{
143+
$this->dispatcher->addListener(PreRunEvent::class, $listener, $priority);
144+
145+
return $this;
146+
}
147+
148+
public function after(callable $listener, int $priority = 0): static
149+
{
150+
$this->dispatcher->addListener(PostRunEvent::class, $listener, $priority);
151+
152+
return $this;
153+
}
154+
122155
public function shouldRestart(): bool
123156
{
124157
return $this->shouldRestart;

src/Symfony/Component/Scheduler/Scheduler.php

+21-1
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,11 @@
1111

1212
namespace Symfony\Component\Scheduler;
1313

14+
use Psr\EventDispatcher\EventDispatcherInterface;
1415
use Symfony\Component\Clock\Clock;
1516
use Symfony\Component\Clock\ClockInterface;
17+
use Symfony\Component\Scheduler\Event\PostRunEvent;
18+
use Symfony\Component\Scheduler\Event\PreRunEvent;
1619
use Symfony\Component\Scheduler\Generator\MessageGenerator;
1720

1821
final class Scheduler
@@ -31,6 +34,7 @@ public function __construct(
3134
private readonly array $handlers,
3235
array $schedules,
3336
private readonly ClockInterface $clock = new Clock(),
37+
private readonly ?EventDispatcherInterface $dispatcher = null,
3438
) {
3539
foreach ($schedules as $schedule) {
3640
$this->addSchedule($schedule);
@@ -62,9 +66,25 @@ public function run(array $options = []): void
6266

6367
$ran = false;
6468
foreach ($this->generators as $generator) {
65-
foreach ($generator->getMessages() as $message) {
69+
foreach ($generator->getMessages() as $context => $message) {
70+
if (!$this->dispatcher) {
71+
$this->handlers[$message::class]($message);
72+
$ran = true;
73+
74+
continue;
75+
}
76+
77+
$preRunEvent = new PreRunEvent($generator->getSchedule(), $context, $message);
78+
$this->dispatcher->dispatch($preRunEvent);
79+
80+
if ($preRunEvent->shouldCancel()) {
81+
continue;
82+
}
83+
6684
$this->handlers[$message::class]($message);
6785
$ran = true;
86+
87+
$this->dispatcher->dispatch(new PostRunEvent($generator->getSchedule(), $context, $message));
6888
}
6989
}
7090

0 commit comments

Comments
 (0)
0