|
15 | 15 | use Psr\Log\NullLogger;
|
16 | 16 | use Symfony\Component\Messenger\Envelope;
|
17 | 17 | use Symfony\Component\Messenger\Exception\HandlerFailedException;
|
| 18 | +use Symfony\Component\Messenger\Exception\LogicException; |
18 | 19 | use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
|
| 20 | +use Symfony\Component\Messenger\Handler\Acknowledger; |
19 | 21 | use Symfony\Component\Messenger\Handler\HandlerDescriptor;
|
20 | 22 | use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
|
| 23 | +use Symfony\Component\Messenger\Stamp\AckStamp; |
21 | 24 | use Symfony\Component\Messenger\Stamp\HandledStamp;
|
| 25 | +use Symfony\Component\Messenger\Stamp\NoAutoAckStamp; |
22 | 26 |
|
23 | 27 | /**
|
24 | 28 | * @author Samuel Roze <samuel.roze@gmail.com>
|
@@ -60,7 +64,40 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
|
60 | 64 |
|
61 | 65 | try {
|
62 | 66 | $handler = $handlerDescriptor->getHandler();
|
63 |
| - $handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message)); |
| 67 | + $batchHandler = $handlerDescriptor->getBatchHandler(); |
| 68 | + $ack = null; |
| 69 | + |
| 70 | + if ($batchHandler && $ackStamp = $envelope->last(AckStamp::class)) { |
| 71 | + $ack = new Acknowledger(get_debug_type($batchHandler), static function (\Throwable $e = null, $result = null) use ($envelope, $ackStamp, $handlerDescriptor) { |
| 72 | + if (null !== $e) { |
| 73 | + $e = new HandlerFailedException($envelope, [$e]); |
| 74 | + } else { |
| 75 | + $envelope = $envelope->with(HandledStamp::fromDescriptor($handlerDescriptor, $result)); |
| 76 | + } |
| 77 | + |
| 78 | + $ackStamp->ack($envelope, $e); |
| 79 | + }); |
| 80 | + } |
| 81 | + |
| 82 | + if (null === $ack) { |
| 83 | + $handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message)); |
| 84 | + } else { |
| 85 | + $batchSize = $handler($message, $ack); |
| 86 | + |
| 87 | + if (!\is_int($batchSize) || 0 > $batchSize) { |
| 88 | + throw new LogicException(sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".', \is_int($batchSize) ? $batchSize : get_debug_type($batchSize), get_debug_type($batchHandler))); |
| 89 | + } |
| 90 | + |
| 91 | + if (!$ack->isAcknowledged()) { |
| 92 | + $envelope = $envelope->with(new NoAutoAckStamp()); |
| 93 | + $handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $batchSize); |
| 94 | + } elseif ($ack->getError()) { |
| 95 | + throw $ack->getError(); |
| 96 | + } else { |
| 97 | + $handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $ack->getResult()); |
| 98 | + } |
| 99 | + } |
| 100 | + |
64 | 101 | $envelope = $envelope->with($handledStamp);
|
65 | 102 | $this->logger->info('Message {class} handled by {handler}', $context + ['handler' => $handledStamp->getHandlerName()]);
|
66 | 103 | } catch (\Throwable $e) {
|
|