8000 [Scheduler] add PRE_RUN and POST_RUN events · symfony/symfony@f462be9 · GitHub
[go: up one dir, main page]

Skip to content

Commit f462be9

Browse files
committed
[Scheduler] add PRE_RUN and POST_RUN events
1 parent cdf30c5 commit f462be9

File tree

12 files changed

+333
-11
lines changed

12 files changed

+333
-11
lines changed

src/Symfony/Bundle/FrameworkBundle/Resources/config/scheduler.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\DependencyInjection\Loader\Configurator;
1313

14+
use Symfony\Component\Scheduler\EventListener\DispatchSchedulerEventListener;
1415
use Symfony\Component\Scheduler\Messenger\SchedulerTransportFactory;
1516
use Symfony\Component\Scheduler\Messenger\ServiceCallMessageHandler;
1617

@@ -27,5 +28,11 @@
2728
service('clock'),
2829
])
2930
->tag('messenger.transport_factory')
31+
->set('scheduler.event_listener', DispatchSchedulerEventListener::class)
32+
->args([
33+
service('messenger.receiver_locator'),
34+
service('event_dispatcher'),
35+
])
36+
->tag('kernel.event_subscriber')
3037
;
3138
};
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\EventListener;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Psr\Clock\ClockInterface;
16+
use Psr\Container\ContainerInterface;
17+
use Symfony\Component\EventDispatcher\EventDispatcher;
18+
use Symfony\Component\Messenger\Envelope;
19+
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
20+
use Symfony\Component\Scheduler\Event\PreRunEvent;
21+
use Symfony\Component\Scheduler\EventListener\DispatchSchedulerEventListener;
22+
use Symfony\Component\Scheduler\Generator\MessageContext;
23+
use Symfony\Component\Scheduler\Generator\MessageGenerator;
24+
use Symfony\Component\Scheduler\Messenger\ScheduledStamp;
25+
use Symfony\Component\Scheduler\Messenger\SchedulerTransport;
26+
use Symfony\Component\Scheduler\RecurringMessage;
27+
use Symfony\Component\Scheduler\Tests\Messenger\SomeScheduleProvider;
28+
use Symfony\Component\Scheduler\Trigger\TriggerInterface;
29+
30+
class DispatchSchedulerEventListenerTest extends TestCase
31+
{
32+
public function testDispatchSchedulerEvents()
33+
{
34+
$trigger = $this->createMock(TriggerInterface::class);
35+
$clock = $this->createMock(ClockInterface::class);
36+
$defaultRecurringMessage = RecurringMessage::trigger($trigger, (object) ['id' => 'default']);
37+
$default = new SchedulerTransport(new MessageGenerator(new SomeScheduleProvider([$defaultRecurringMessage]), 'default', $clock));
38+
39+
$receiverLocator = $this->createMock(ContainerInterface::class);
40+
$receiverLocator->expects($this->once())->method('has')->willReturn(true);
41+
$receiverLocator->expects($this->once())->method('get')->willReturn($default);
42+
43+
$context = new MessageContext('default', 'default', $trigger, $this->createMock(\DateTimeImmutable::class));
44+
$envelope = (new Envelope(new \stdClass()))->with(new ScheduledStamp($context));
45+
46+
/** @var ContainerInterface $receiverLocator */
47+
$listener = new DispatchSchedulerEventListener($receiverLocator, $eventDispatcher = new EventDispatcher());
48+
$workerReceivedEvent = new WorkerMessageReceivedEvent($envelope, 'default');
49+
$secondListener = new TestEventListener();
50+
51+
$eventDispatcher->addListener(PreRunEvent::class, [$secondListener, 'preRun']);
52+
$eventDispatcher->addListener(PreRunEvent::class, [$secondListener, 'postRun']);
53+
$listener->onMessageReceived($workerReceivedEvent);
54+
55+
$this->assertTrue($secondListener->preInvoked);
56+
$this->assertTrue($secondListener->postInvoked);
57+
}
58+
}
59+
60+
class TestEventListener
61+
{
62+
public string $name;
63+
public bool $preInvoked = false;
64+
public bool $postInvoked = false;
65+
66+
/* Listener methods */
67+
68+
public function preRun($e)
69+
{
70+
$this->preInvoked = true;
71+
}
72+
73+
public function postRun($e)
74+
{
75+
$this->postInvoked = true;
76+
}
77+
}

src/Symfony/Component/Scheduler/DependencyInjection/AddScheduleMessengerPass.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ class AddScheduleMessengerPass implements CompilerPassInterface
3030
{
3131
public function process(ContainerBu 10000 ilder $container): void
3232
{
33+
if (!$container->has('event_dispatcher')) {
34+
$container->removeDefinition('scheduler.event_listener');
35+
}
36+
3337
$receivers = [];
3438
foreach ($container->findTaggedServiceIds('messenger.receiver') as $tags) {
3539
$receivers[$tags[0]['alias']] = true;
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Scheduler\Event;
13+
14+
use Symfony\Component\Scheduler\Generator\MessageContext;
15+
use Symfony\Component\Scheduler\ScheduleProviderInterface;
16+
17+
class PostRunEvent
18+
{
19+
public function __construct(
20+
private readonly ScheduleProviderInterface $schedule,
21+
private readonly MessageContext $messageContext,
22+
private readonly object $message,
23+
) {
24+
}
25+
26+
public function getMessageContext(): MessageContext
27+
{
28+
return $this->messageContext;
29+
}
30+
31+
public function getSchedule(): ScheduleProviderInterface
32+
{
5C7
33+
return $this->schedule;
34+
}
35+
36+
public function getMessage(): object
37+
{
38+
return $this->message;
39+
}
40+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Scheduler\Event;
13+
14+
use Symfony\Component\Scheduler\Generator\MessageContext;
15+
use Symfony\Component\Scheduler\ScheduleProviderInterface;
16+
17+
class PreRunEvent
18+
{
19+
private bool $shouldCancel = false;
20+
21+
public function __construct(
22+
private readonly ScheduleProviderInterface $schedule,
23+
private readonly MessageContext $messageContext,
24+
private readonly object $message,
25+
) {
26+
}
27+
28+
public function getMessageContext(): MessageContext
29+
{
30+
return $this->messageContext;
31+
}
32+
33+
public function getSchedule(): ScheduleProviderInterface
34+
{
35+
return $this->schedule;
36+
}
37+
38+
public function getMessage(): object
39+
{
40+
return $this->message;
41+
}
42+
43+
public function shouldCancel(bool $shouldCancel = null): bool
44+
{
45+
if (null !== $shouldCancel) {
46+
$this->shouldCancel = $shouldCancel;
47+
}
48+
49+
return $this->shouldCancel;
50+
}
51+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Scheduler\EventListener;
13+
14+
use Psr\Container\ContainerInterface;
15+
use Psr\EventDispatcher\EventDispatcherInterface;
16+
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
17+
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
18+
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
19+
use Symfony\Component\Scheduler\Event\PostRunEvent;
20+
use Symfony\Component\Scheduler\Event\PreRunEvent;
21+
use Symfony\Component\Scheduler\Messenger\ScheduledStamp;
22+
use Symfony\Component\Scheduler\Messenger\SchedulerTransport;
23+
24+
class DispatchSchedulerEventListener implements EventSubscriberInterface
25+
{
26+
public function __construct(
27+
private readonly ContainerInterface $receiverLocator,
28+
private readonly EventDispatcherInterface $eventDispatcher,
29+
) {
30+
}
31+
32+
public function onMessageHandled(WorkerMessageHandledEvent $event): void
33+
{
34+
$envelope = $event->getEnvelope();
35+
if (!$scheduledStamp = $envelope->last(ScheduledStamp::class)) {
36+
return;
37+
}
38+
39+
if (!$this->receiverLocator->has($event->getReceiverName())) {
40+
return;
41+
}
42+
43+
/** @var SchedulerTransport $schedulerTransport */
44+
$schedulerTransport = $this->receiverLocator->get($event->getReceiverName());
45+
$schedule = $schedulerTransport->getMessageGenerator()->getSchedule();
46+
47+
$this->eventDispatcher->dispatch(new PostRunEvent($schedule, $scheduledStamp->messageContext, $envelope->getMessage()));
48+
}
49+
50+
public function onMessageReceived(WorkerMessageReceivedEvent $event): void
51+
{
52+
$envelope = $event->getEnvelope();
53+
54+
if (!$scheduledStamp = $envelope->last(ScheduledStamp::class)) {
55+
return;
56+
}
57+
58+
if (!$this->receiverLocator->has($event->getReceiverName())) {
59+
return;
60+
}
61+
62+
/** @var SchedulerTransport $schedulerTransport */
63+
$schedulerTransport = $this->receiverLocator->get($event->getReceiverName());
64+
$schedule = $schedulerTransport->getMessageGenerator()->getSchedule();
65+
66+
$preRunEvent = new PreRunEvent($schedule, $scheduledStamp->messageContext, $envelope->getMessage());
67+
68+
$this->eventDispatcher->dispatch($preRunEvent);
69+
70+
if ($preRunEvent->shouldCancel()) {
71+
$event->shouldHandle(false);
72+
}
73+
}
74+
75+
public static function getSubscribedEvents(): array
76+
{
77+
return [
78+
WorkerMessageReceivedEvent::class => ['onMessageReceived'],
79+
WorkerMessageHandledEvent::class => ['onMessageHandled'],
80+
];
81+
}
82+
}

src/Symfony/Component/Scheduler/Generator/MessageGenerator.php

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ public function getMessages(): \Generator
9393
$checkpoint->release($now, $this->waitUntil);
9494
}
9595

96+
public function getSchedule(): Schedule
97+
{
98+
return $this->schedule ??= $this->scheduleProvider->getSchedule();
99+
}
100+
96101
private function heap(\DateTimeImmutable $time, \DateTimeImmutable $startTime): TriggerHeap
97102
{
98103
if (isset($this->triggerHeap) && $this->triggerHeap->time <= $time) {
@@ -101,7 +106,7 @@ private function heap(\DateTimeImmutable $time, \DateTimeImmutable $startTime):
101106

102107
$heap = new TriggerHeap($time);
103108

104-
foreach ($this->schedule()->getRecurringMessages() as $index => $recurringMessage) {
109+
foreach ($this->getSchedule()->getRecurringMessages() as $index => $recurringMessage) {
105110
$trigger = $recurringMessage->getTrigger();
106111

107112
if ($trigger instanceof StatefulTriggerInterface) {
@@ -118,13 +123,8 @@ private function heap(\DateTimeImmutable $time, \DateTimeImmutable $startTime):
118123
return $this->triggerHeap = $heap;
119124
}
120125

121-
private function schedule(): Schedule
122-
{
123-
return $this->schedule ??= $this->scheduleProvider->getSchedule();
124-
}
125-
126126
private function checkpoint(): Checkpoint
127127
{
128-
return $this->checkpoint ??= new Checkpoint('scheduler_checkpoint_'.$this->name, $this->schedule()->getLock(), $this->schedule()->getState());
128+
return $this->checkpoint ??= new Checkpoint('scheduler_checkpoint_'.$this->name, $this->getSchedule()->getLock(), $this->getSchedule()->getState());
129129
}
130130
}

src/Symfony/Component/Scheduler/Generator/MessageGeneratorInterface.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,14 @@
1111

1212
namespace Symfony\Component\Scheduler\Generator;
1313

14+
use Symfony\Component\Scheduler\Schedule;
15+
1416
interface MessageGeneratorInterface
1517
{
1618
/**
1719
* @return iterable<MessageContext, object>
1820
*/
1921
public function getMessages(): iterable;
22+
23+
public function getSchedule(): Schedule;
2024
}

src/Symfony/Component/Scheduler/Messenger/SchedulerTransport.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,9 @@ public function send(Envelope $envelope): Envelope
5454
{
5555
throw new LogicException(sprintf('"%s" cannot send messages.', __CLASS__));
5656
}
57+
58+
public function getMessageGenerator(): MessageGeneratorInterface
59+
{
60+
return $this->messageGenerator;
61+
}
5762
}

0 commit comments

Comments
 (0)
0