14
14
use Psr \Container \ContainerInterface ;
15
15
use Psr \EventDispatcher \EventDispatcherInterface ;
16
16
use Symfony \Component \EventDispatcher \EventSubscriberInterface ;
17
+ use Symfony \Component \Messenger \Envelope ;
18
+ use Symfony \Component \Messenger \Event \WorkerMessageFailedEvent ;
17
19
use Symfony \Component \Messenger \Event \WorkerMessageHandledEvent ;
18
20
use Symfony \Component \Messenger \Event \WorkerMessageReceivedEvent ;
21
+ use Symfony \Component \Messenger \Stamp \StampInterface ;
22
+ use Symfony \Component \Scheduler \Event \FailureEvent ;
19
23
use Symfony \Component \Scheduler \Event \PostRunEvent ;
20
24
use Symfony \Component \Scheduler \Event \PreRunEvent ;
21
25
use Symfony \Component \Scheduler \Messenger \ScheduledStamp ;
@@ -31,11 +35,8 @@ public function __construct(
31
35
public function onMessageHandled (WorkerMessageHandledEvent $ event ): void
32
36
{
33
37
$ envelope = $ event ->getEnvelope ();
34
- if (!$ scheduledStamp = $ envelope ->last (ScheduledStamp::class)) {
35
- return ;
36
- }
37
38
38
- if (!$ this ->scheduleProviderLocator -> has ( $ scheduledStamp -> messageContext -> name )) {
39
+ if (!$ scheduledStamp = $ this ->getScheduledStamp ( $ envelope )) {
39
40
return ;
40
41
}
41
42
@@ -46,11 +47,7 @@ public function onMessageReceived(WorkerMessageReceivedEvent $event): void
46
47
{
47
48
$ envelope = $ event ->getEnvelope ();
48
49
49
- if (!$ scheduledStamp = $ envelope ->last (ScheduledStamp::class)) {
50
- return ;
51
- }
52
-
53
- if (!$ this ->scheduleProviderLocator ->has ($ scheduledStamp ->messageContext ->name )) {
50
+ if (!$ scheduledStamp = $ this ->getScheduledStamp ($ envelope )) {
54
51
return ;
55
52
}
56
53
@@ -63,11 +60,36 @@ public function onMessageReceived(WorkerMessageReceivedEvent $event): void
63
60
}
64
61
}
65
62
63
+ public function onMessageFailed (WorkerMessageFailedEvent $ event ): void
64
+ {
65
+ $ envelope = $ event ->getEnvelope ();
66
+
67
+ if (!$ scheduledStamp = $ this ->getScheduledStamp ($ envelope )) {
68
+ return ;
69
+ }
70
+
71
+ $ this ->eventDispatcher ->dispatch (new FailureEvent ($ this ->scheduleProviderLocator ->get ($ scheduledStamp ->messageContext ->name ), $ scheduledStamp ->messageContext , $ envelope ->getMessage (), $ event ->getThrowable ()));
72
+ }
73
+
74
+ private function getScheduledStamp (Envelope $ envelope ): ?StampInterface
75
+ {
76
+ if (!$ scheduledStamp = $ envelope ->last (ScheduledStamp::class)) {
77
+ return null ;
78
+ }
79
+
80
+ if (!$ this ->scheduleProviderLocator ->has ($ scheduledStamp ->messageContext ->name )) {
81
+ return null ;
82
+ }
83
+
84
+ return $ scheduledStamp ;
85
+ }
86
+
66
87
public static function getSubscribedEvents (): array
67
88
{
68
89
return [
69
90
WorkerMessageReceivedEvent::class => ['onMessageReceived ' ],
70
91
WorkerMessageHandledEvent::class => ['onMessageHandled ' ],
92
+ WorkerMessageFailedEvent::class => ['onMessageFailed ' ],
71
93
];
72
94
}
73
95
}
0 commit comments