8000 feature #51805 [Scheduler] pre_run and post_run events (alli83) · symfony/symfony@50662d0 · GitHub
[go: up one dir, main page]

Skip to content

Commit 50662d0

Browse files
committed
feature #51805 [Scheduler] pre_run and post_run events (alli83)
This PR was merged into the 6.4 branch. Discussion ---------- [Scheduler] pre_run and post_run events | Q | A | ------------- | --- | Branch? | 6.4 | Bug fix? | no | New feature? | yes | Deprecations? | no | Tickets | #49803 (comment) | License | MIT Based on #49803 `@kbond` and taking into account #51553 The aim of this PR is to be able to act on the accumulated messages 'if something happens' and to be able to recalculate the heap via events (currently pre_run and post_run). The aim is to have access to - the the schedule and therefore add/cancel a certain type of message. - MessageContexte (e.g. access the id) - The message itself This PR would complement `@Jeroeny` #51553 PR by enabling action via events. Commits ------- 20fd21a [Scheduler] add PRE_RUN and POST_RUN events
2 parents c6a9dde + 20fd21a commit 50662d0

File tree

12 files changed

+319
-11
lines changed

12 files changed

+319
-11
lines changed

src/Symfony/Bundle/FrameworkBundle/Resources/config/scheduler.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\DependencyInjection\Loader\Configurator;
1313

14+
use Symfony\Component\Scheduler\EventListener\DispatchSchedulerEventListener;
1415
use Symfony\Component\Scheduler\Messenger\SchedulerTransportFactory;
1516
use Symfony\Component\Scheduler\Messenger\ServiceCallMessageHandler;
1617

@@ -27,5 +28,11 @@
2728
service('clock'),
2829
])
2930
->tag('messenger.transport_factory')
31+
->set('scheduler.event_listener', DispatchSchedulerEventListener::class)
32+
->args([
33+
tagged_locator('scheduler.schedule_provider', 'name'),
34+
service('event_dispatcher'),
35+
])
36+
->tag('kernel.event_subscriber')
3037
;
3138
};

src/Symfony/Component/Scheduler/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ CHANGELOG
1313
* Add `ScheduledStamp` to `RedispatchMessage`
1414
* Allow modifying Schedule instances at runtime
1515
* Add `MessageProviderInterface` to trigger unique messages at runtime
16+
* Add `PreRunEvent` and `PostRunEvent` events
17+
* Add `DispatchSchedulerEventListener`
1618

1719
6.3
1820
---

src/Symfony/Component/Scheduler/DependencyInjection/AddScheduleMessengerPass.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ class AddScheduleMessengerPass implements CompilerPassInterface
3030
{
3131
public function process(ContainerBuilder $container): void
3232
{
33+
if (!$container->has('event_dispatcher')) {
34+
$container->removeDefinition('scheduler.event_listener');
35+
}
36+
3337
$receivers = [];
3438
foreach ($container->findTaggedServiceIds('messenger.receiver') as $tags) {
3539
$receivers[$tags[0]['alias']] = true;
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Scheduler\Event;
13+
14+
use Symfony\Component\Scheduler\Generator\MessageContext;
15+
use Symfony\Component\Scheduler\ScheduleProviderInterface;
16+
17+
class PostRunEvent
18+
{
19+
public function __construct(
20+
private readonly ScheduleProviderInterface $schedule,
21+
private readonly MessageContext $messageContext,
22+
private readonly object $message,
23+
) {
24+
}
25+
26+
public function getMessageContext(): MessageContext
27+
{
28+
return $this->messageContext;
29+
}
30+
31+
public function getSchedule(): ScheduleProviderInterface
32+
{
33+
return $this->schedule;
34+
}
35+
36+
public function getMessage(): object
37+
{
38+
return $this->message;
39+
}
40+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Scheduler\Event;
13+
14+
use Symfony\Component\Scheduler\Generator\MessageContext;
15+
use Symfony\Component\Scheduler\ScheduleProviderInterface;
16+
17+
class PreRunEvent
18+
{
19+
private bool $shouldCancel = false;
20+
21+
public function __construct(
22+
private readonly ScheduleProviderInterface $schedule,
23+
private readonly MessageContext $messageContext,
24+
private readonly object $message,
25+
) {
26+
}
27+
28+
public function getMessageContext(): MessageContext
29+
{
30+
return $this->messageContext;
31+
}
32+
33+
public function getSchedule(): ScheduleProviderInterface
34+
{
35+
return $this->schedule;
36+
}
37+
38+
public function getMessage(): object
39+
{
40+
return $this->message;
41+
}
42+
43+
public function shouldCancel(bool $shouldCancel = null): bool
44+
{
45+
if (null !== $shouldCancel) {
46+
$this->shouldCancel = $shouldCancel;
47+
}
48+
49+
return $this->shouldCancel;
50+
}
51+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Scheduler\EventListener;
13+
14+
use Psr\Container\ContainerInterface;
15+
use Psr\EventDispatcher\EventDispatcherInterface;
16+
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
17+
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
18+
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
19+
use Symfony\Component\Scheduler\Event\PostRunEvent;
20+
use Symfony\Component\Scheduler\Event\PreRunEvent;
21+
use Symfony\Component\Scheduler\Messenger\ScheduledStamp;
22+
23+
class DispatchSchedulerEventListener implements EventSubscriberInterface
24+
{
25+
public function __construct(
26+
private readonly ContainerInterface $scheduleProviderLocator,
27+
private readonly EventDispatcherInterface $eventDispatcher,
28+
) {
29+
}
30+
31+
public function onMessageHandled< 179B /span>(WorkerMessageHandledEvent $event): void
32+
{
33+
$envelope = $event->getEnvelope();
34+
if (!$scheduledStamp = $envelope->last(ScheduledStamp::class)) {
35+
return;
36+
}
37+
38+
if (!$this->scheduleProviderLocator->has($scheduledStamp->messageContext->name)) {
39+
return;
40+
}
41+
42+
$this->eventDispatcher->dispatch(new PostRunEvent($this->scheduleProviderLocator->get($scheduledStamp->messageContext->name), $scheduledStamp->messageContext, $envelope->getMessage()));
43+
}
44+
45+
public function onMessageReceived(WorkerMessageReceivedEvent $event): void
46+
{
47+
$envelope = $event->getEnvelope();
48+
49+
if (!$scheduledStamp = $envelope->last(ScheduledStamp::class)) {
50+
return;
51+
}
52+
53+
if (!$this->scheduleProviderLocator->has($scheduledStamp->messageContext->name)) {
54+
return;
55+
}
56+
57+
$preRunEvent = new PreRunEvent($this->scheduleProviderLocator->get($scheduledStamp->messageContext->name), $scheduledStamp->messageContext, $envelope->getMessage());
58+
59+
$this->eventDispatcher->dispatch($preRunEvent);
60+
61+
if ($preRunEvent->shouldCancel()) {
62+
$event->shouldHandle(false);
63+
}
64+
}
65+
66+
public static function getSubscribedEvents(): array
67+
{
68+
return [
69+
WorkerMessageReceivedEvent::class => ['onMessageReceived'],
70+
WorkerMessageHandledEvent::class => ['onMessageHandled'],
71+
];
72+
}
73+
}

src/Symfony/Component/Scheduler/Generator/MessageGenerator.php

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ public function getMessages(): \Generator
9393
$checkpoint->release($now, $this->waitUntil);
9494
}
9595

96+
public function getSchedule(): Schedule
97+
{
98+
return $this->schedule ??= $this->scheduleProvider->getSchedule();
99+
}
100+
96101
private function heap(\DateTimeImmutable $time, \DateTimeImmutable $startTime): TriggerHeap
97102
{
98103
if (isset($this->triggerHeap) && $this->triggerHeap->time <= $time) {
@@ -101,7 +106,7 @@ private function heap(\DateTimeImmutable $time, \DateTimeImmutable $startTime):
101106

102107
$heap = new TriggerHeap($time);
103108

104-
foreach ($this->schedule()->getRecurringMessages() as $index => $recurringMessage) {
109+
foreach ($this->getSchedule()->getRecurringMessages() as $index => $recurringMessage) {
105110
$trigger = $recurringMessage->getTrigger();
106111

107112
if ($trigger instanceof StatefulTriggerInterface) {
@@ -118,13 +123,8 @@ private function heap(\DateTimeImmutable $time, \DateTimeImmutable $startTime):
118123
return $this->triggerHeap = $heap;
119124
}
120125

121-
private function schedule(): Schedule
122-
{
123-
return $this->schedule ??= $this->scheduleProvider->getSchedule();
124-
}
125-
126126
private function checkpoint(): Checkpoint
127127
{
128-
return $this->checkpoint ??= new Checkpoint('scheduler_checkpoint_'.$this->name, $this->schedule()->getLock(), $this->schedule()->getState());
128+
return $this->checkpoint ??= new Checkpoint('scheduler_checkpoint_'.$this->name, $this->getSchedule()->getLock(), $this->getSchedule()->getState());
129129
}
130130
}

src/Symfony/Component/Scheduler/Messenger/SchedulerTransport.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,9 @@ public function send(Envelope $envelope): Envelope
5454
{
5555
throw new LogicException(sprintf('"%s" cannot send messages.', __CLASS__));
5656
}
57+
58+
public function getMessageGenerator(): MessageGeneratorInterface
59+
{
60+
return $this->messageGenerator;
61+
}
5762
}

src/Symfony/Component/Scheduler/Schedule.php

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,29 @@
1111

1212
namespace Symfony\Component\Scheduler;
1313

14+
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
1415
use Symfony\Component\Lock\LockInterface;
16+
use Symfony\Component\Scheduler\Event\PostRunEvent;
17+
use Symfony\Component\Scheduler\Event\PreRunEvent;
1518
use Symfony\Component\Scheduler\Exception\LogicException;
1619
use Symfony\Contracts\Cache\CacheInterface;
1720

1821
final class Schedule implements ScheduleProviderInterface
1922
{
23+
public function __construct(
24+
private readonly ?EventDispatcherInterface $dispatcher = null,
25+
) {
26+
}
27+
2028
/** @var array<string,RecurringMessage> */
2129
private array $messages = [];
2230
private ?LockInterface $lock = null;
2331
private ?CacheInterface $state = null;
2432
private bool $shouldRestart = false;
2533

26-
public static function with(RecurringMessage $message, RecurringMessage ...$messages): static
34+
public function with(RecurringMessage $message, RecurringMessage ...$messages): static
2735
{
28-
return static::doAdd(new self(), $message, ...$messages);
36+
return static::doAdd(new self($this->dispatcher), $message, ...$messages);
2937
}
3038

3139
/**
@@ -62,6 +70,17 @@ public function remove(RecurringMessage $message): static
6270
return $this;
6371
}
6472

73+
/**
74+
* @return $this
75+
*/
76+
public function removeById(string $id): static
77+
{
78+
unset($this->messages[$id]);
79+
$this->setRestart(true);
80+
81+
return $this;
82+
}
83+
6584
/**
6685
* @return $this
6786
*/
@@ -119,6 +138,20 @@ public function getSchedule(): static
119138
return $this;
120139
}
121140

141+
public function before(callable $listener, int $priority = 0): static
142+
{
143+
$this->dispatcher->addListener(PreRunEvent::class, $listener, $priority);
144+
145+
return $this;
146+
}
147+
148+
public function after(callable $listener, int $priority = 0): static
149+
{
150+
$this->dispatcher->addListener(PostRunEvent::class, $listener, $priority);
151+
152+
return $this;
153+
}
154+
122155
public function shouldRestart(): bool
123156
{
124157
return $this->shouldRestart;

src/Symfony/Component/Scheduler/Scheduler.php

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,11 @@
1111

1212
namespace Symfony\Component\Scheduler;
1313

14+
use Psr\EventDispatcher\EventDispatcherInterface;
1415
use Symfony\Component\Clock\Clock;
1516
use Symfony\Component\Clock\ClockInterface;
17+
use Symfony\Component\Scheduler\Event\PostRunEvent;
18+
use Symfony\Component\Scheduler\Event\PreRunEvent;
1619
use Symfony\Component\Scheduler\Generator\MessageGenerator;
1720

1821
final class Scheduler
@@ -31,6 +34,7 @@ public function __construct(
3134
private readonly array $handlers,
3235
array $schedules,
3336
private readonly ClockInterface $clock = new Clock(),
37+
private readonly ?EventDispatcherInterface $dispatcher = null,
3438
) {
3539
foreach ($schedules as $schedule) {
3640
$this->addSchedule($schedule);
@@ -62,9 +66,25 @@ public function run(array $options = []): void
6266

6367
$ran = false;
6468
foreach ($this->generators as $generator) {
65-
foreach ($generator->getMessages() as $message) {
69+
foreach ($generator->getMessages() as $context => $message) {
70+
if (!$this->dispatcher) {
71+
$this->handlers[$message::class]($message);
72+
$ran = true;
73+
74+
continue;
75+
}
76+
77+
$preRunEvent = new PreRunEvent($generator->getSchedule(), $context, $message);
78+
$this->dispatcher->dispatch($preRunEvent);
79+
80+
if ($preRunEvent->shouldCancel()) {
81+
continue;
82+
}
83+
6684
$this->handlers[$message::class]($message);
6785
$ran = true;
86+
87+
$this->dispatcher->dispatch(new PostRunEvent($generator->getSchedule(), $context, $message));
6888
}
6989
}
7090

0 commit comments

Comments
 (0)
0