8000 [Scheduler] Add `FailureEvent` by alli83 · Pull Request #52087 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[Scheduler] Add FailureEvent #52087

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Symfony/Component/Scheduler/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ CHANGELOG
* Add `MessageProviderInterface` to trigger unique messages at runtime
* Add `PreRunEvent` and `PostRunEvent` events
* Add `DispatchSchedulerEventListener`
* Add `FailureEvent` event

6.3
---
Expand Down
57 changes: 57 additions & 0 deletions src/Symfony/Component/Scheduler/Event/FailureEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Scheduler\Event;

use Symfony\Component\Scheduler\Generator\MessageContext;
use Symfony\Component\Scheduler\ScheduleProviderInterface;

class FailureEvent
{
private bool $shouldIgnore = false;

public function __construct(
private readonly ScheduleProviderInterface $schedule,
private readonly MessageContext $messageContext,
private readonly object $message,
private readonly \Throwable $error,
) {
}

public function getMessageContext(): MessageContext
{
return $this->messageContext;
}

public function getSchedule(): ScheduleProviderInterface
{
return $this->schedule;
}

public function getMessage(): object
{
return $this->message;
}

public function getError(): \Throwable
{
return $this->error;
}

public function shouldIgnore(bool $shouldIgnore = null): bool
{
if (null !== $shouldIgnore) {
$this->shouldIgnore = $shouldIgnore;
}

return $this->shouldIgnore;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@
use Psr\Container\ContainerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Scheduler\Event\FailureEvent;
use Symfony\Component\Scheduler\Event\PostRunEvent;
use Symfony\Component\Scheduler\Event\PreRunEvent;
use Symfony\Component\Scheduler\Messenger\ScheduledStamp;
Expand All @@ -31,11 +35,8 @@ public function __construct(
public function onMessageHandled(WorkerMessageHandledEvent $event): void
{
$envelope = $event->getEnvelope();
if (!$scheduledStamp = $envelope->last(ScheduledStamp::class)) {
return;
}

if (!$this->scheduleProviderLocator->has($scheduledStamp->messageContext->name)) {
if (!$scheduledStamp = $this->getScheduledStamp($envelope)) {
return;
}

Expand All @@ -46,11 +47,7 @@ public function onMessageReceived(WorkerMessageReceivedEvent $event): void
{
$envelope = $event->getEnvelope();

if (!$scheduledStamp = $envelope->last(ScheduledStamp::class)) {
return;
}

if (!$this->scheduleProviderLocator->has($scheduledStamp->messageContext->name)) {
if (!$scheduledStamp = $this->getScheduledStamp($envelope)) {
return;
}

Expand All @@ -63,11 +60,36 @@ public function onMessageReceived(WorkerMessageReceivedEvent $event): void
}
}

public function onMessageFailed(WorkerMessageFailedEvent $event): void
{
$envelope = $event->getEnvelope();

if (!$scheduledStamp = $this->getScheduledStamp($envelope)) {
return;
}

$this->eventDispatcher->dispatch(new FailureEvent($this->scheduleProviderLocator->get($scheduledStamp->messageContext->name), $scheduledStamp->messageContext, $envelope->getMessage(), $event->getThrowable()));
}

private function getScheduledStamp(Envelope $envelope): ?StampInterface
{
if (!$scheduledStamp = $envelope->last(ScheduledStamp::class)) {
return null;
}

if (!$this->scheduleProviderLocator->has($scheduledStamp->messageContext->name)) {
return null;
}

return $scheduledStamp;
}

public static function getSubscribedEvents(): array
{
return [
WorkerMessageReceivedEvent::class => ['onMessageReceived'],
WorkerMessageHandledEvent::class => ['onMessageHandled'],
WorkerMessageFailedEvent::class => ['onMessageFailed'],
];
}
}
8 changes: 8 additions & 0 deletions src/Symfony/Component/Scheduler/Schedule.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Lock\LockInterface;
use Symfony\Component\Scheduler\Event\FailureEvent;
use Symfony\Component\Scheduler\Event\PostRunEvent;
use Symfony\Component\Scheduler\Event\PreRunEvent;
use Symfony\Component\Scheduler\Exception\LogicException;
Expand Down Expand Up @@ -152,6 +153,13 @@ public function after(callable $listener, int $priority = 0): static
return $this;
}

public function onFailure(callable $listener, int $priority = 0): static
{
$this->dispatcher->addListener(FailureEvent::class, $listener, $priority);

return $this;
}

public function shouldRestart(): bool
{
return $this->shouldRestart;
Expand Down
16 changes: 13 additions & 3 deletions src/Symfony/Component/Scheduler/Scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Psr\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Clock\Clock;
use Symfony\Component\Clock\ClockInterface;
use Symfony\Component\Scheduler\Event\FailureEvent;
use Symfony\Component\Scheduler\Event\PostRunEvent;
use Symfony\Component\Scheduler\Event\PreRunEvent;
use Symfony\Component\Scheduler\Generator\MessageGenerator;
Expand Down Expand Up @@ -81,10 +82,19 @@ public function run(array $options = []): void
continue;
}

$this->handlers[$message::class]($message);
$ran = true;
try {
$this->handlers[$message::class]($message);
$ran = true;

$this->dispatcher->dispatch(new PostRunEvent($generator->getSchedule(), $context, $message));
$this->dispatcher->dispatch(new PostRunEvent($generator->getSchedule(), $context, $message));
} catch (\Throwable $error) {
$failureEvent = new FailureEvent($generator->getSchedule(), $context, $message, $error);
$this->dispatcher->dispatch($failureEvent);

if (!$failureEvent->shouldIgnore()) {
throw $error;
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
use Psr\Container\ContainerInterface;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Scheduler\Event\FailureEvent;
use Symfony\Component\Scheduler\Event\PostRunEvent;
use Symfony\Component\Scheduler\Event\PreRunEvent;
use Symfony\Component\Scheduler\EventListener\DispatchSchedulerEventListener;
Expand Down Expand Up @@ -45,15 +47,19 @@ public function testDispatchSchedulerEvents()
$listener = new DispatchSchedulerEventListener($scheduleProviderLocator, $eventDispatcher = new EventDispatcher());
$workerReceivedEvent = new WorkerMessageReceivedEvent($envelope, 'default');
$workerHandledEvent = new WorkerMessageHandledEvent($envelope, 'default');
$workerFailedEvent = new WorkerMessageFailedEvent($envelope, 'default', new \Exception());
$secondListener = new TestEventListener();

$eventDispatcher->addListener(PreRunEvent::class, [$secondListener, 'preRun']);
$eventDispatcher->addListener(PostRunEvent::class, [$secondListener, 'postRun']);
$eventDispatcher->addListener(FailureEvent::class, [$secondListener, 'onFailure']);
$listener->onMessageReceived($workerReceivedEvent);
$listener->onMessageHandled($workerHandledEvent);
$listener->onMessageFailed($workerFailedEvent);

$this->assertTrue($secondListener->preInvoked);
$this->assertTrue($secondListener->postInvoked);
$this->assertTrue($secondListener->failureInvoked);
}
}

Expand All @@ -62,6 +68,7 @@ class TestEventListener
public string $name;
public bool $preInvoked = false;
public bool $postInvoked = false;
public bool $failureInvoked = false;

/* Listener methods */

Expand All @@ -74,4 +81,9 @@ public function postRun($e)
{
$this->postInvoked = true;
}

public function onFailure($e)
{
$this->failureInvoked = true;
}
}
0