diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/scheduler.php b/src/Symfony/Bundle/FrameworkBundle/Resources/config/scheduler.php index 7dad84b465f4..7b2856d8272e 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/scheduler.php +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/scheduler.php @@ -11,6 +11,7 @@ namespace Symfony\Component\DependencyInjection\Loader\Configurator; +use Symfony\Component\Scheduler\EventListener\DispatchSchedulerEventListener; use Symfony\Component\Scheduler\Messenger\SchedulerTransportFactory; use Symfony\Component\Scheduler\Messenger\ServiceCallMessageHandler; @@ -27,5 +28,11 @@ service('clock'), ]) ->tag('messenger.transport_factory') + ->set('scheduler.event_listener', DispatchSchedulerEventListener::class) + ->args([ + tagged_locator('scheduler.schedule_provider', 'name'), + service('event_dispatcher'), + ]) + ->tag('kernel.event_subscriber') ; }; diff --git a/src/Symfony/Component/Scheduler/CHANGELOG.md b/src/Symfony/Component/Scheduler/CHANGELOG.md index bec878751933..5514e3a95275 100644 --- a/src/Symfony/Component/Scheduler/CHANGELOG.md +++ b/src/Symfony/Component/Scheduler/CHANGELOG.md @@ -13,6 +13,8 @@ CHANGELOG * Add `ScheduledStamp` to `RedispatchMessage` * Allow modifying Schedule instances at runtime * Add `MessageProviderInterface` to trigger unique messages at runtime + * Add `PreRunEvent` and `PostRunEvent` events + * Add `DispatchSchedulerEventListener` 6.3 --- diff --git a/src/Symfony/Component/Scheduler/DependencyInjection/AddScheduleMessengerPass.php b/src/Symfony/Component/Scheduler/DependencyInjection/AddScheduleMessengerPass.php index 7b99bbdee60c..1fa0d81e1be6 100644 --- a/src/Symfony/Component/Scheduler/DependencyInjection/AddScheduleMessengerPass.php +++ b/src/Symfony/Component/Scheduler/DependencyInjection/AddScheduleMessengerPass.php @@ -30,6 +30,10 @@ class AddScheduleMessengerPass implements CompilerPassInterface { public function process(ContainerBuilder $container): void { + if (!$container->has('event_dispatcher')) { + $container->removeDefinition('scheduler.event_listener'); + } + $receivers = []; foreach ($container->findTaggedServiceIds('messenger.receiver') as $tags) { $receivers[$tags[0]['alias']] = true; diff --git a/src/Symfony/Component/Scheduler/Event/PostRunEvent.php b/src/Symfony/Component/Scheduler/Event/PostRunEvent.php new file mode 100644 index 000000000000..d5a71021edca --- /dev/null +++ b/src/Symfony/Component/Scheduler/Event/PostRunEvent.php @@ -0,0 +1,40 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Scheduler\Event; + +use Symfony\Component\Scheduler\Generator\MessageContext; +use Symfony\Component\Scheduler\ScheduleProviderInterface; + +class PostRunEvent +{ + public function __construct( + private readonly ScheduleProviderInterface $schedule, + private readonly MessageContext $messageContext, + private readonly object $message, + ) { + } + + public function getMessageContext(): MessageContext + { + return $this->messageContext; + } + + public function getSchedule(): ScheduleProviderInterface + { + return $this->schedule; + } + + public function getMessage(): object + { + return $this->message; + } +} diff --git a/src/Symfony/Component/Scheduler/Event/PreRunEvent.php b/src/Symfony/Component/Scheduler/Event/PreRunEvent.php new file mode 100644 index 000000000000..4da4f9732a3e --- /dev/null +++ b/src/Symfony/Component/Scheduler/Event/PreRunEvent.php @@ -0,0 +1,51 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Scheduler\Event; + +use Symfony\Component\Scheduler\Generator\MessageContext; +use Symfony\Component\Scheduler\ScheduleProviderInterface; + +class PreRunEvent +{ + private bool $shouldCancel = false; + + public function __construct( + private readonly ScheduleProviderInterface $schedule, + private readonly MessageContext $messageContext, + private readonly object $message, + ) { + } + + public function getMessageContext(): MessageContext + { + return $this->messageContext; + } + + public function getSchedule(): ScheduleProviderInterface + { + return $this->schedule; + } + + public function getMessage(): object + { + return $this->message; + } + + public function shouldCancel(bool $shouldCancel = null): bool + { + if (null !== $shouldCancel) { + $this->shouldCancel = $shouldCancel; + } + + return $this->shouldCancel; + } +} diff --git a/src/Symfony/Component/Scheduler/EventListener/DispatchSchedulerEventListener.php b/src/Symfony/Component/Scheduler/EventListener/DispatchSchedulerEventListener.php new file mode 100644 index 000000000000..a71e093e55d3 --- /dev/null +++ b/src/Symfony/Component/Scheduler/EventListener/DispatchSchedulerEventListener.php @@ -0,0 +1,73 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Scheduler\EventListener; + +use Psr\Container\ContainerInterface; +use Psr\EventDispatcher\EventDispatcherInterface; +use Symfony\Component\EventDispatcher\EventSubscriberInterface; +use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent; +use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; +use Symfony\Component\Scheduler\Event\PostRunEvent; +use Symfony\Component\Scheduler\Event\PreRunEvent; +use Symfony\Component\Scheduler\Messenger\ScheduledStamp; + +class DispatchSchedulerEventListener implements EventSubscriberInterface +{ + public function __construct( + private readonly ContainerInterface $scheduleProviderLocator, + private readonly EventDispatcherInterface $eventDispatcher, + ) { + } + + public function onMessageHandled(WorkerMessageHandledEvent $event): void + { + $envelope = $event->getEnvelope(); + if (!$scheduledStamp = $envelope->last(ScheduledStamp::class)) { + return; + } + + if (!$this->scheduleProviderLocator->has($scheduledStamp->messageContext->name)) { + return; + } + + $this->eventDispatcher->dispatch(new PostRunEvent($this->scheduleProviderLocator->get($scheduledStamp->messageContext->name), $scheduledStamp->messageContext, $envelope->getMessage())); + } + + public function onMessageReceived(WorkerMessageReceivedEvent $event): void + { + $envelope = $event->getEnvelope(); + + if (!$scheduledStamp = $envelope->last(ScheduledStamp::class)) { + return; + } + + if (!$this->scheduleProviderLocator->has($scheduledStamp->messageContext->name)) { + return; + } + + $preRunEvent = new PreRunEvent($this->scheduleProviderLocator->get($scheduledStamp->messageContext->name), $scheduledStamp->messageContext, $envelope->getMessage()); + + $this->eventDispatcher->dispatch($preRunEvent); + + if ($preRunEvent->shouldCancel()) { + $event->shouldHandle(false); + } + } + + public static function getSubscribedEvents(): array + { + return [ + WorkerMessageReceivedEvent::class => ['onMessageReceived'], + WorkerMessageHandledEvent::class => ['onMessageHandled'], + ]; + } +} diff --git a/src/Symfony/Component/Scheduler/Generator/MessageGenerator.php b/src/Symfony/Component/Scheduler/Generator/MessageGenerator.php index 0e81e988f231..8266ac74a825 100644 --- a/src/Symfony/Component/Scheduler/Generator/MessageGenerator.php +++ b/src/Symfony/Component/Scheduler/Generator/MessageGenerator.php @@ -93,6 +93,11 @@ public function getMessages(): \Generator $checkpoint->release($now, $this->waitUntil); } + public function getSchedule(): Schedule + { + return $this->schedule ??= $this->scheduleProvider->getSchedule(); + } + private function heap(\DateTimeImmutable $time, \DateTimeImmutable $startTime): TriggerHeap { if (isset($this->triggerHeap) && $this->triggerHeap->time <= $time) { @@ -101,7 +106,7 @@ private function heap(\DateTimeImmutable $time, \DateTimeImmutable $startTime): $heap = new TriggerHeap($time); - foreach ($this->schedule()->getRecurringMessages() as $index => $recurringMessage) { + foreach ($this->getSchedule()->getRecurringMessages() as $index => $recurringMessage) { $trigger = $recurringMessage->getTrigger(); if ($trigger instanceof StatefulTriggerInterface) { @@ -118,13 +123,8 @@ private function heap(\DateTimeImmutable $time, \DateTimeImmutable $startTime): return $this->triggerHeap = $heap; } - private function schedule(): Schedule - { - return $this->schedule ??= $this->scheduleProvider->getSchedule(); - } - private function checkpoint(): Checkpoint { - return $this->checkpoint ??= new Checkpoint('scheduler_checkpoint_'.$this->name, $this->schedule()->getLock(), $this->schedule()->getState()); + return $this->checkpoint ??= new Checkpoint('scheduler_checkpoint_'.$this->name, $this->getSchedule()->getLock(), $this->getSchedule()->getState()); } } diff --git a/src/Symfony/Component/Scheduler/Messenger/SchedulerTransport.php b/src/Symfony/Component/Scheduler/Messenger/SchedulerTransport.php index df57ef7c2fa0..3815588a85ed 100644 --- a/src/Symfony/Component/Scheduler/Messenger/SchedulerTransport.php +++ b/src/Symfony/Component/Scheduler/Messenger/SchedulerTransport.php @@ -54,4 +54,9 @@ public function send(Envelope $envelope): Envelope { throw new LogicException(sprintf('"%s" cannot send messages.', __CLASS__)); } + + public function getMessageGenerator(): MessageGeneratorInterface + { + return $this->messageGenerator; + } } diff --git a/src/Symfony/Component/Scheduler/Schedule.php b/src/Symfony/Component/Scheduler/Schedule.php index 422aa4dc74d2..f784e3433678 100644 --- a/src/Symfony/Component/Scheduler/Schedule.php +++ b/src/Symfony/Component/Scheduler/Schedule.php @@ -11,21 +11,29 @@ namespace Symfony\Component\Scheduler; +use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Component\Lock\LockInterface; +use Symfony\Component\Scheduler\Event\PostRunEvent; +use Symfony\Component\Scheduler\Event\PreRunEvent; use Symfony\Component\Scheduler\Exception\LogicException; use Symfony\Contracts\Cache\CacheInterface; final class Schedule implements ScheduleProviderInterface { + public function __construct( + private readonly ?EventDispatcherInterface $dispatcher = null, + ) { + } + /** @var array */ private array $messages = []; private ?LockInterface $lock = null; private ?CacheInterface $state = null; private bool $shouldRestart = false; - public static function with(RecurringMessage $message, RecurringMessage ...$messages): static + public function with(RecurringMessage $message, RecurringMessage ...$messages): static { - return static::doAdd(new self(), $message, ...$messages); + return static::doAdd(new self($this->dispatcher), $message, ...$messages); } /** @@ -62,6 +70,17 @@ public function remove(RecurringMessage $message): static return $this; } + /** + * @return $this + */ + public function removeById(string $id): static + { + unset($this->messages[$id]); + $this->setRestart(true); + + return $this; + } + /** * @return $this */ @@ -119,6 +138,20 @@ public function getSchedule(): static return $this; } + public function before(callable $listener, int $priority = 0): static + { + $this->dispatcher->addListener(PreRunEvent::class, $listener, $priority); + + return $this; + } + + public function after(callable $listener, int $priority = 0): static + { + $this->dispatcher->addListener(PostRunEvent::class, $listener, $priority); + + return $this; + } + public function shouldRestart(): bool { return $this->shouldRestart; diff --git a/src/Symfony/Component/Scheduler/Scheduler.php b/src/Symfony/Component/Scheduler/Scheduler.php index b3da60abbf2b..4b6ecc285fa6 100644 --- a/src/Symfony/Component/Scheduler/Scheduler.php +++ b/src/Symfony/Component/Scheduler/Scheduler.php @@ -11,8 +11,11 @@ namespace Symfony\Component\Scheduler; +use Psr\EventDispatcher\EventDispatcherInterface; use Symfony\Component\Clock\Clock; use Symfony\Component\Clock\ClockInterface; +use Symfony\Component\Scheduler\Event\PostRunEvent; +use Symfony\Component\Scheduler\Event\PreRunEvent; use Symfony\Component\Scheduler\Generator\MessageGenerator; final class Scheduler @@ -31,6 +34,7 @@ public function __construct( private readonly array $handlers, array $schedules, private readonly ClockInterface $clock = new Clock(), + private readonly ?EventDispatcherInterface $dispatcher = null, ) { foreach ($schedules as $schedule) { $this->addSchedule($schedule); @@ -62,9 +66,25 @@ public function run(array $options = []): void $ran = false; foreach ($this->generators as $generator) { - foreach ($generator->getMessages() as $message) { + foreach ($generator->getMessages() as $context => $message) { + if (!$this->dispatcher) { + $this->handlers[$message::class]($message); + $ran = true; + + continue; + } + + $preRunEvent = new PreRunEvent($generator->getSchedule(), $context, $message); + $this->dispatcher->dispatch($preRunEvent); + + if ($preRunEvent->shouldCancel()) { + continue; + } + $this->handlers[$message::class]($message); $ran = true; + + $this->dispatcher->dispatch(new PostRunEvent($generator->getSchedule(), $context, $message)); } } diff --git a/src/Symfony/Component/Scheduler/Tests/EventListener/DispatchSchedulerEventListenerTest.php b/src/Symfony/Component/Scheduler/Tests/EventListener/DispatchSchedulerEventListenerTest.php new file mode 100644 index 000000000000..e8785add9881 --- /dev/null +++ b/src/Symfony/Component/Scheduler/Tests/EventListener/DispatchSchedulerEventListenerTest.php @@ -0,0 +1,73 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\EventListener; + +use PHPUnit\Framework\TestCase; +use Psr\Container\ContainerInterface; +use Symfony\Component\EventDispatcher\EventDispatcher; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; +use Symfony\Component\Scheduler\Event\PreRunEvent; +use Symfony\Component\Scheduler\EventListener\DispatchSchedulerEventListener; +use Symfony\Component\Scheduler\Generator\MessageContext; +use Symfony\Component\Scheduler\Messenger\ScheduledStamp; +use Symfony\Component\Scheduler\RecurringMessage; +use Symfony\Component\Scheduler\Tests\Messenger\SomeScheduleProvider; +use Symfony\Component\Scheduler\Trigger\TriggerInterface; + +class DispatchSchedulerEventListenerTest extends TestCase +{ + public function testDispatchSchedulerEvents() + { + $trigger = $this->createMock(TriggerInterface::class); + $defaultRecurringMessage = RecurringMessage::trigger($trigger, (object) ['id' => 'default']); + + $schedulerProvider = new SomeScheduleProvider([$defaultRecurringMessage]); + $scheduleProviderLocator = $this->createMock(ContainerInterface::class); + $scheduleProviderLocator->expects($this->once())->method('has')->willReturn(true); + $scheduleProviderLocator->expects($this->once())->method('get')->willReturn($schedulerProvider); + + $context = new MessageContext('default', 'default', $trigger, $this->createMock(\DateTimeImmutable::class)); + $envelope = (new Envelope(new \stdClass()))->with(new ScheduledStamp($context)); + + /** @var ContainerInterface $scheduleProviderLocator */ + $listener = new DispatchSchedulerEventListener($scheduleProviderLocator, $eventDispatcher = new EventDispatcher()); + $workerReceivedEvent = new WorkerMessageReceivedEvent($envelope, 'default'); + $secondListener = new TestEventListener(); + + $eventDispatcher->addListener(PreRunEvent::class, [$secondListener, 'preRun']); + $eventDispatcher->addListener(PreRunEvent::class, [$secondListener, 'postRun']); + $listener->onMessageReceived($workerReceivedEvent); + + $this->assertTrue($secondListener->preInvoked); + $this->assertTrue($secondListener->postInvoked); + } +} + +class TestEventListener +{ + public string $name; + public bool $preInvoked = false; + public bool $postInvoked = false; + + /* Listener methods */ + + public function preRun($e) + { + $this->preInvoked = true; + } + + public function postRun($e) + { + $this->postInvoked = true; + } +} diff --git a/src/Symfony/Component/Scheduler/Tests/Generator/MessageGeneratorTest.php b/src/Symfony/Component/Scheduler/Tests/Generator/MessageGeneratorTest.php index 01522288f2a9..e100ff2e6c0c 100644 --- a/src/Symfony/Component/Scheduler/Tests/Generator/MessageGeneratorTest.php +++ b/src/Symfony/Component/Scheduler/Tests/Generator/MessageGeneratorTest.php @@ -124,7 +124,7 @@ public function testGetMessagesFromScheduleProviderWithRestart() public function __construct(array $schedule) { - $this->schedule = Schedule::with(...$schedule); + $this->schedule = (new Schedule())->with(...$schedule); $this->schedule->stateful(new ArrayAdapter()); }