8000 [Scheduler] add PRE_RUN and POST_RUN events · symfony/symfony@0a0fe36 · GitHub
[go: up one dir, main page]

Skip to content

Commit 0a0fe36

Browse files
committed
[Scheduler] add PRE_RUN and POST_RUN events
1 parent 541e845 commit 0a0fe36

File tree

16 files changed

+276
-20
lines changed

16 files changed

+276
-20
lines changed

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php

+8
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory;
1919
use Symfony\Component\Messenger\EventListener\AddErrorDetailsStampListener;
2020
use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener;
21+
use Symfony\Component\Messenger\EventListener\DispatchSchedulerEventListener;
2122
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
2223
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
2324
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
@@ -179,6 +180,13 @@
179180
->tag('kernel.event_subscriber')
180181
->tag('monolog.logger', ['channel' => 'messenger'])
181182

183+
->set('messenger.disp A3DB atch.scheduler_event_listener', DispatchSchedulerEventListener::class)
184+
->args([
185+
service('messenger.receiver_locator'),
186+
service('event_dispatcher'),
187+
])
188+
->tag('kernel.event_subscriber')
189+
182190
->set('messenger.failure.add_error_details_stamp_listener', AddErrorDetailsStampListener::class)
183191
->tag('kernel.event_subscriber')
184192

src/Symfony/Bundle/FrameworkBundle/Tests/Fixtures/Messenger/DummySchedule.php

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger;
44

55
use Symfony\Component\Cache\Adapter\ArrayAdapter;
6+
use Symfony\Component\EventDispatcher\EventDispatcher;
67
use Symfony\Component\Lock\Key;
78
use Symfony\Component\Lock\Lock;
89
use Symfony\Component\Lock\Store\InMemoryStore;
@@ -17,7 +18,7 @@ class DummySchedule implements ScheduleProviderInterface
1718

1819
public function getSchedule(): Schedule F438
1920
{
20-
return (new Schedule())
21+
return (new Schedule(new EventDispatcher()))
2122
->add(...self::$recurringMessages)
2223
->stateful(new ArrayAdapter())
2324
->lock(new Lock(new Key('dummy'), new InMemoryStore()))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\EventListener;
13+
14+
use Psr\Container\ContainerInterface;
15+
use Psr\EventDispatcher\EventDispatcherInterface;
16+
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
17+
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
18+
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
19+
use Symfony\Component\Scheduler\Event\PostRunEvent;
20+
use Symfony\Component\Scheduler\Event\PreRunEvent;
21+
use Symfony\Component\Scheduler\Messenger\ScheduledStamp;
22+
use Symfony\Component\Scheduler\Messenger\SchedulerTransport;
23+
24+
class DispatchSchedulerEventListener implements EventSubscriberInterface
25+
{
26+
public function __construct(
27+
private readonly ContainerInterface $receiverLocator,
28+
private readonly ?EventDispatcherInterface $eventDispatcher = null
29+
) {
30+
}
31+
32+
public function onMessageHandled(WorkerMessageHandledEvent $event): void
33+
{
34+
$envelope = $event->getEnvelope();
35+
if (!$scheduledStamp = $envelope->last(ScheduledStamp::class)) {
36+
return;
37+
}
38+
39+
$messageContext = $scheduledStamp->messageContext;
40+
41+
if (!$this->receiverLocator->has($event->getReceiverName())) {
42+
return;
43+
}
44+
45+
/** @var SchedulerTransport $schedulerTransport */
46+
$schedulerTransport = $this->receiverLocator->get($event->getReceiverName());
47+
$schedule = $schedulerTransport->getMessageGenerator()->schedule();
48+
49+
$this->eventDispatcher?->dispatch(new PostRunEvent($schedule, $messageContext, $envelope->getMessage()));
50+
}
51+
52+
public function onMessageReceived(WorkerMessageReceivedEvent $event): void
53+
{
54+
$envelope = $event->getEnvelope();
55+
56+
if (!$scheduledStamp = $envelope->last(ScheduledStamp::class)) {
57+
return;
58+
}
59+
60+
$messageContext = $scheduledStamp->messageContext;
61+
F42D
62+
if (!$this->receiverLocator->has($event->getReceiverName())) {
63+
return;
64+
}
65+
66+
/** @var SchedulerTransport $schedulerTransport */
67+
$schedulerTransport = $this->receiverLocator->get($event->getReceiverName());
68+
$schedule = $schedulerTransport->getMessageGenerator()->schedule();
69+
70+
$preRunEvent = new PreRunEvent($schedule, $messageContext, $envelope->getMessage());
71+
72+
$this->eventDispatcher?->dispatch($preRunEvent);
73+
74+
if ($preRunEvent->hasBeenCancelled()) {
75+
$event->shouldHandle(false);
76+
}
77+
}
78+
79+
public static function getSubscribedEvents(): array
80+
{
81+
return [
82+
WorkerMessageReceivedEvent::class => ['onMessageReceived'],
83+
WorkerMessageHandledEvent::class => ['onMessageHandled'],
84+
];
85+
}
86+
}

src/Symfony/Component/Messenger/Worker.php

+3
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@
3434
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
3535
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
3636
use Symfony\Component\RateLimiter\LimiterInterface;
37+
use Symfony\Component\Scheduler\Event\PostRunEvent;
38+
use Symfony\Component\Scheduler\Event\PreRunEvent;
39+
use Symfony\Component\Scheduler\Event\SchedulerEvents;
3740

3841
/**
3942
* @author Samuel Roze <samuel.roze@gmail.com>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Scheduler\Event;
13+
14+
15+
use Symfony\Component\Scheduler\Generator\MessageContext;
16+
use Symfony\Component\Scheduler\ScheduleProviderInterface;
17+
18+
class PostRunEvent
19+
{
20+
public function __construct(private readonly ScheduleProviderInterface $schedule, private readonly MessageContext $messageContext, private readonly object $message)
21+
{
22+
}
23+
24+
public function getMessageContext(): MessageContext
25+
{
26+
return $this->messageContext;
27+
}
28+
29+
public function getSchedule(): ScheduleProviderInterface
30+
{
31+
return $this->schedule;
32+
}
33+
34+
public function getMessage(): object
35+
{
36+
return $this->message;
37+
}
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Scheduler\Event;
13+
14+
use Symfony\Component\Scheduler\Generator\MessageContext;
15+
use Symfony\Component\Scheduler\ScheduleProviderInterface;
16+
17+
class PreRunEvent
18+
{
19+
private bool $isCancel = false;
20+
21+
public function __construct(private readonly ScheduleProviderInterface $schedule, private readonly MessageContext $messageContext, private readonly object $message)
22+
{
23+
}
24+
25+
public function getMessageContext(): MessageContext
26+
{
27+
return $this->messageContext;
28+
}
29+
30+
public function getSchedule(): ScheduleProviderInterface
31+
{
32+
return $this->schedule;
33+
}
34+
35+
public function getMessage(): object
36+
{
37+
return $this->message;
38+
}
39+
40+
public function shouldCancel(bool $shouldCancel = null): bool
41+
{
42+
if (null !== $shouldCancel) {
43+
$this->isCancel = $shouldCancel;
44+
}
45+
46+
return $this->isCancel;
47+
}
48+
49+
public function hasBeenCancelled(): bool
50+
{
51+
return $this->isCancel;
52+
}
53+
}

src/Symfony/Component/Scheduler/Generator/MessageGenerator.php

+5-5
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ public function getMessages(): \Generator
9393
$checkpoint->release($now, $this->waitUntil);
9494
}
9595

96+
public function schedule(): Schedule
97+
{
98+
return $this->schedule ??= $this->scheduleProvider->getSchedule();
99+
}
100+
96101
private function heap(\DateTimeImmutable $time, \DateTimeImmutable $startTime): TriggerHeap
97102
{
98103
if (isset($this->triggerHeap) && $this->triggerHeap->time <= $time) {
@@ -118,11 +123,6 @@ private function heap(\DateTimeImmutable $time, \DateTimeImmutable $startTime):
118123
return $this->triggerHeap = $heap;
119124
}
120125

121-
private function schedule(): Schedule
122-
{
123-
return $this->schedule ??= $this->scheduleProvider->getSchedule();
124-
}
125-
126126
private function checkpoint(): Checkpoint
127127
{
128128
return $this->checkpoint ??= new Checkpoint('scheduler_checkpoint_'.$this->name, $this->schedule()->getLock(), $this->schedule()->getState());

src/Symfony/Component/Scheduler/Messenger/SchedulerTransport.php

+5
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,9 @@ public function send(Envelope $envelope): Envelope
5454
{
5555
throw new LogicException(sprintf('"%s" cannot send messages.', __CLASS__));
5656
}
57+
58+
public function getMessageGenerator(): MessageGeneratorInterface
59+
{
60+
return $this->messageGenerator;
61+
}
5762
}

src/Symfony/Component/Scheduler/Schedule.php

+34-2
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,28 @@
1111

1212
namespace Symfony\Component\Scheduler;
1313

14+
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
1415
use Symfony\Component\Lock\LockInterface;
16+
use Symfony\Component\Scheduler\Event\PostRunEvent;
17+
use Symfony\Component\Scheduler\Event\PreRunEvent;
1518
use Symfony\Component\Scheduler\Exception\LogicException;
1619
use Symfony\Contracts\Cache\CacheInterface;
1720

1821
final class Schedule implements ScheduleProviderInterface
1922
{
23+
public function __construct(private readonly EventDispatcherInterface $dispatcher)
24+
{
25+
}
26+
2027
/** @var array<string,RecurringMessage> */
2128
private array $messages = [];
2229
private ?LockInterface $lock = null;
2330
private ?CacheInterface $state = null;
2431
private bool $shouldRestart = false;
2532

26-
public static function with(RecurringMessage $message, RecurringMessage ...$messages): static
33+
public function with(RecurringMessage $message, RecurringMessage ...$messages): static
2734
{
28-
return static::doAdd(new self(), $message, ...$messages);
35+
return static::doAdd(new self($this->dispatcher), $message, ...$messages);
2936
}
3037

3138
/**
@@ -62,6 +69,17 @@ public function remove(RecurringMessage $message): static
6269
return $this;
6370
}
6471

72+
/**
73+
* @return $this
74+
*/
75+
public function removeById(string $id): static
76+
{
77+
unset($this->messages[$id]);
78+
$this->setRestart(true);
79+
80+
return $this;
81+
}
82+
6583
/**
6684
* @return $this
6785
*/
@@ -119,6 +137,20 @@ public function getSchedule(): static
119137
return $this;
120138
}
121139

140+
public function before(callable $listener, int $priority = 0): static
141+
{
142+
$this->dispatcher->addListener(PreRunEvent::class, $listener, $priority);
143+
144+
return $this;
145+
}
146+
147+
public function after(callable $listener, int $priority = 0): static
148+
{
149+
$this->dispatcher->addListener(PostRunEvent::class, $listener, $priority);
150+
151+
return $this;
152+
}
153+
122154
public function shouldRestart(): bool
123155
{
124156
return $this->shouldRestart;

src/Symfony/Component/Scheduler/Scheduler.php

+11-2
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@
1111

1212
namespace Symfony\Component\Scheduler;
1313

14+
use Psr\EventDispatcher\EventDispatcherInterface;
1415
use Symfony\Component\Clock\Clock;
1516
use Symfony\Component\Clock\ClockInterface;
17+
use Symfony\Component\Scheduler\Event\PreRunEvent;
1618
use Symfony\Component\Scheduler\Generator\MessageGenerator;
1719

1820
final class Scheduler
@@ -31,6 +33,7 @@ public function __construct(
3133
private readonly array $handlers,
3234
array $schedules,
3335
private readonly ClockInterface $clock = new Clock(),
36+
private readonly ?EventDispatcherInterface $dispatcher = null
3437
) {
3538
foreach ($schedules as $schedule) {
3639
$this->addSchedule($schedule);
@@ -62,8 +65,14 @@ public function run(array $options = []): void
6265

6366
$ran = false;
6467
foreach ($this->generators as $generator) {
65-
foreach ($generator->getMessages() as $message) {
66-
$this->handlers[$message::class]($message);
68+
foreach ($generator->getMessages() as $context => $message) {
69+
$preRunEvent = new PreRunEvent($generator->schedule(), $context, $message);
70+
$this->dispatcher?->dispatch($preRunEvent);
71+
72+
if (!$preRunEvent->hasBeenCancelled()) {
73+
$this->handlers[$message::class]($message);
74+
}
75+
6776
$ran = true;
6877
}
6978
}

0 commit comments

Comments
 (0)
0