8000 [Scheduler] add PRE_RUN and POST_RUN events · symfony/symfony@f2cfe76 · GitHub
[go: up one dir, main page]

Skip to content

Commit f2cfe76

Browse files
committed
[Scheduler] add PRE_RUN and POST_RUN events
1 parent 541e845 commit f2cfe76

File tree

11 files changed

+325
-10
lines changed

11 files changed

+325
-10
lines changed

src/Symfony/Bundle/FrameworkBundle/Resources/config/scheduler.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\DependencyInjection\Loader\Configurator;
1313

14+
use Symfony\Component\Scheduler\EventListener\DispatchSchedulerEventListener;
1415
use Symfony\Component\Scheduler\Messenger\SchedulerTransportFactory;
1516
use Symfony\Component\Scheduler\Messenger\ServiceCallMessageHandler;
1617

@@ -27,5 +28,11 @@
2728
service('clock'),
2829
])
2930
->tag('messenger.transport_factory')
31+
->set('scheduler.event_listener', DispatchSchedulerEventListener::class)
32+
->args([
33+
service('messenger.receiver_locator'),
34+
service('event_dispatcher'),
35+
])
36+
->tag('kernel.event_subscriber')
3037
;
3138
};
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\EventListener;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Psr\Clock\ClockInterface;
16+
use Psr\Container\ContainerInterface;
17+
use Symfony\Component\EventDispatcher\EventDispatcher;
18+
use Symfony\Component\Messenger\Envelope;
19+
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
20+
use Symfony\Component\Scheduler\Event\PreRunEvent;
21+
use Symfony\Component\Scheduler\EventListener\DispatchSchedulerEventListener;
22+
use Symfony\Component\Scheduler\Generator\MessageContext;
23+
use Symfony\Component\Scheduler\Generator\MessageGenerator;
24+
use Symfony\Component\Scheduler\Messenger\ScheduledStamp;
25+
use Symfony\Component\Scheduler\Messenger\SchedulerTransport;
26+
use Symfony\Component\Scheduler\RecurringMessage;
27+
use Symfony\Component\Scheduler\Tests\Messenger\SomeScheduleProvider;
28+
use Symfony\Component\Scheduler\Trigger\TriggerInterface;
29+
30+
class DispatchSchedulerEventListenerTest extends TestCase
31+
{
32+
public function testDispatchSchedulerEvents()
33+
{
34+
$trigger = $this->createMock(TriggerInterface::class);
35+
$clock = $this->createMock(ClockInterface::class);
36+
$defaultRecurringMessage = RecurringMessage::trigger($trigger, (object) ['id' => 'default']);
37+
$default = new SchedulerTransport(new MessageGenerator(new SomeScheduleProvider([$defaultRecurringMessage]), 'default', $clock));
38+
39+
$receiverLocator = $this->createMock(ContainerInterface::class);
40+
$receiverLocator->expects($this->once())->method('has')->willReturn(true);
41+
$receiverLocator->expects($this->once())->method('get')->willReturn($default);
42+
43+
$context = new MessageContext('default', 'default', $trigger, $this->createMock(\DateTimeImmutable::class));
44+
$envelope = (new Envelope(new \stdClass()))->with(new ScheduledStamp($context));
45+
46+
/** @var ContainerInterface $receiverLocator */
47+
$listener = new DispatchSchedulerEventListener($receiverLocator, $eventDispatcher = new EventDispatcher());
48+
$workerReceivedEvent = new WorkerMessageReceivedEvent($envelope, 'default');
49+
$secondListener = new TestEventListener();
50+
51+
$eventDispatcher->addListener(PreRunEvent::class, [$secondListener, 'preRun']);
52+
$eventDispatcher->addListener(PreRunEvent::class, [$secondListener, 'postRun']);
53+
$listener->onMessageReceived($workerReceivedEvent);
54+
55+
$this->assertTrue($secondListener->preInvoked);
56+
$this->assertTrue($secondListener->postInvoked);
57+
}
58+
}
59+
60+
class TestEventListener
61+
{
62+
public string $name;
63+
public bool $preInvoked = false;
64+
public bool $postInvoked = false;
65+
66+
/* Listener methods */
67+
68+
public function preRun($e)
69+
{
70+
$this->preInvoked = true;
71+
}
72+
73+
public function postRun($e)
74+
{
75+
$this->postInvoked = true;
76+
}
77+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Scheduler\Event;
13+
14+
use Symfony\Component\Scheduler\Generator\MessageContext;
15+
use Symfony\Component\Scheduler\ScheduleProviderInterface;
16+
17+
class PostRunEvent
18+
{
19+
public function __construct(
20+
private readonly ScheduleProviderInterface $schedule,
21+
private readonly MessageContext $messageContext,
22+
private readonly object $message
23+
) {
24+
}
25+
26+
public function getMessageContext(): MessageContext
27+
{
28+
return $this->messageContext;
29+
}
30+
31+
public function getSchedule(): ScheduleProviderInterface
32+
{
33+
return $this->schedule;
34+
}
35+
36+
public function getMessage(): object
37+
{
38+
return $this->message;
39+
}
40+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Scheduler\Event;
13+
14+
use Symfony\Component\Scheduler\Generator\MessageContext;
15+
use Symfony\Component\Scheduler\ScheduleProviderInterface;
16+
17+
class PreRunEvent
18+
{
19+
private bool $hasBeenCancelled = false;
20+
21+
public function __construct(
22+
private readonly ScheduleProviderInterface $schedule,
23+
private readonly MessageContext $messageContext,
24+
private readonly object $message
25+
) {
26+
}
27+
28+
public function getMessageContext(): MessageContext
29+
{
30+
return $this->messageContext;
31+
}
32+
33+
public function getSchedule(): ScheduleProviderInterface
34+
{
35+
return $this->schedule;
36+
}
37+
38+
public function getMessage(): object
39+
{
40+
return $this->message;
41+
}
42+
43+
public function hasBeenCancelled(bool $hasBeenCancelled = null): bool
44+
{
45+
if (null !== $hasBeenCancelled) {
46+
$this->hasBeenCancelled = $hasBeenCancelled;
4 10000 7+
}
48+
49+
return $this->hasBeenCancelled;
50+
}
51+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Scheduler\EventListener;
13+
14+
use Psr\Container\ContainerInterface;
15+
use Psr\EventDispatcher\EventDispatcherInterface;
16+
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
17+
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
18+
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
19+
use Symfony\Component\Scheduler\Event\PostRunEvent;
20+
use Symfony\Component\Scheduler\Event\PreRunEvent;
21+
use Symfony\Component\Scheduler\Messenger\ScheduledStamp;
22+
use Symfony\Component\Scheduler\Messenger\SchedulerTransport;
23+
24+
class DispatchSchedulerEventListener implements EventSubscriberInterface
25+
{
26+
public function __construct(
27+
private readonly ContainerInterface $receiverLocator,
28+
private readonly EventDispatcherInterface $eventDispatcher,
29+
) {
30+
}
31+
32+
public function onMessageHandled(WorkerMessageHandledEvent $event): void
33+
{
34+
$envelope = $event->getEnvelope();
35+
if (!$scheduledStamp = $envelope->last(ScheduledStamp::class)) {
36+
return;
37+
}
38+
39+
$messageContext = $scheduledStamp->messageContext;
40+
41+
if (!$this->receiverLocator->has($event->getReceiverName())) {
42+
return;
43+
}
44+
45+
/** @var SchedulerTransport $schedulerTransport */
46+
$schedulerTransport = $this->receiverLocator->get($event->getReceiverName());
47+
$schedule = $schedulerTransport->getMessageGenerator()->schedule();
48+
49+
$this->eventDispatcher->dispatch(new PostRunEvent($schedule, $messageContext, $envelope->getMessage()));
50+
}
51+
52+
public function onMessageReceived(WorkerMessageReceivedEvent $event): void
53+
{
54+
$envelope = $event->getEnvelope();
55+
56+
if (!$scheduledStamp = $envelope->last(ScheduledStamp::class)) {
57+
return;
58+
}
59+
60+
$messageContext = $scheduledStamp->messageContext;
61+
62+
if (!$this->receiverLocator->has($event->getReceiverName())) {
63+
return;
64+
}
65+
66+
/** @var SchedulerTransport $schedulerTransport */
67+
$schedulerTransport = $this->receiverLocator->get($event->getReceiverName());
68+
$schedule = $schedulerTransport->getMessageGenerator()->schedule();
69+
70+
$preRunEvent = new PreRunEvent($schedule, $messageContext, $envelope->getMessage());
71+
72+
$this->eventDispatcher->dispatch($preRunEvent);
73+
74+
if ($preRunEvent->hasBeenCancelled()) {
75+
$event->shouldHandle(false);
76+
}
77+
}
78+
79+
public static function getSubscribedEvents(): array
80+
{
81+
return [
82+
WorkerMessageReceivedEvent::class => ['onMessageReceived'],
83+
WorkerMessageHandledEvent::class => ['onMessageHandled'],
84+
];
85+
}
86+
}

src/Symfony/Component/Scheduler/Generator/MessageGenerator.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ public function getMessages(): \Generator
9393
$checkpoint->release($now, $this->waitUntil);
9494
}
9595

96+
public function schedule(): Schedule
97+
{
98+
return $this->schedule ??= $this->scheduleProvider->getSchedule();
99+
}
100+
96101
private function heap(\DateTimeImmutable $time, \DateTimeImmutable $startTime): TriggerHeap
97102
{
98103
if (isset($this->triggerHeap) && $this->triggerHeap->time <= $time) {
@@ -118,11 +123,6 @@ private function heap(\DateTimeImmutable $time, \DateTimeImmutable $startTime):
118123
return $this->triggerHeap = $heap;
119124
}
120125

121-
private function schedule(): Schedule
122-
{
123-
return $this->schedule ??= $this->scheduleProvider->getSchedule();
124-
}
125-
126126
private function checkpoint(): Checkpoint
127127
{
128128
return $this->checkpoint ??= new Checkpoint('scheduler_checkpoint_'.$this->name, $this->schedule()->getLock(), $this->schedule()->getState());

src/Symfony/Component/Scheduler/Generator/MessageGeneratorInterface.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,14 @@
1111

1212
namespace Symfony\Component\Scheduler\Generator;
1313

14+
use Symfony\Component\Scheduler\Schedule;
15+
1416
interface MessageGeneratorInterface
1517
{
1618
/**
1719
* @return iterable<MessageContext, object>
1820
*/
1921
public function getMessages(): iterable;
22+
23+
public function schedule(): Schedule;
2024
}

src/Symfony/Component/Scheduler/Messenger/SchedulerTransport.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,9 @@ public function send(Envelope $envelope): Envelope
5454
{
5555
throw new LogicException(sprintf('"%s" cannot send messages.', __CLASS__));
5656
}
57+
58+
public function getMessageGenerator(): MessageGeneratorInterface
59+
{
60+
return $this->messageGenerator;
61+
}
5762
}

src/Symfony/Component/Scheduler/Schedule.php

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,29 @@
1111

1212
namespace Symfony\Component\Scheduler;
1313

14+
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
1415
use Symfony\Component\Lock\LockInterface;
16+
use Symfony\Component\Scheduler\Event\PostRunEvent;
17+
use Symfony\Component\Scheduler\Event\PreRunEvent;
1518
use Symfony\Component\Scheduler\Exception\LogicException;
1619
use Symfony\Contracts\Cache\CacheInterface;
1720

1821
final class Schedule implements ScheduleProviderInterface
1922
{
23+
public function __construct(
24+
private readonly ?EventDispatcherInterface $dispatcher = null
25+
) {
26+
}
27+
2028
/** @var array<string,RecurringMessage> */
2129
private array $messages = [];
2230
private ?LockInterface $lock = null;
2331
private ?CacheInterface $state = null;
2432
private bool $shouldRestart = false;
2533

26-
public static function with(RecurringMessage $message, RecurringMessage ...$messages): static
34+
public function with(RecurringMessage $message, RecurringMessage ...$messages): static
2735
{
28-
return static::doAdd(new self(), $message, ...$messages);
36+
return static::doAdd(new self($this->dispatcher), $message, ...$messages);
2937
}
3038

3139
/**
@@ -62,6 +70,17 @@ public function remove(RecurringMessage $message): static
6270
return $this;
6371
}
6472

73+
/**
74+
* @return $this
75+
*/
76+
public function removeById(string $id): static
77+
{
78+
unset($this->messages[$id]);
79+
$this->setRestart(true);
80+
81+
return $this;
82+
}
83+
6584
/**
6685
* @return $this
6786
*/
@@ -119,6 +138,20 @@ public function getSchedule(): static
119138
return $this;
120139
}
121140

141+
public function before(callable $listener, int $priority = 0): static
142+
{
143+
$this->dispatcher->addListener(PreRunEvent::class, $listener, $priority);
144+
145+
return $this;
146+
}
147+
148+
public function after(callable $listener, int $priority = 0): static
149+
{
150+
$this->dispatcher->addListener(PostRunEvent::class, $listener, $priority);
151+
152+
return $this;
153+
}
154+
122155
public function shouldRestart(): bool
123156
{
124157
return $this->shouldRestart;

0 commit comments

Comments
 (0)
0