From 1025f3501bc55f5682871cc93cd02456b8c6c96b Mon Sep 17 00:00:00 2001 From: Fabien Potencier Date: Sat, 18 Mar 2023 10:18:04 +0100 Subject: [PATCH] [Messenger] Add support for the DelayStamp in InMemoryTransport --- .../InMemory/InMemoryTransportTest.php | 10 ++++++ .../Transport/InMemory/InMemoryTransport.php | 31 ++++++++++++++----- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/src/Symfony/Component/Messenger/Tests/Transport/InMemory/InMemoryTransportTest.php b/src/Symfony/Component/Messenger/Tests/Transport/InMemory/InMemoryTransportTest.php index 97e7f227a50e1..1cd1db68beaa0 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/InMemory/InMemoryTransportTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/InMemory/InMemoryTransportTest.php @@ -13,6 +13,7 @@ use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Tests\Fixtures\AnEnvelopeStamp; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; @@ -84,6 +85,15 @@ public function testQueue() $this->assertSame([], $this->transport->get()); } + public function testQueueWithDelay() + { + $envelope1 = new Envelope(new \stdClass()); + $envelope1 = $this->transport->send($envelope1); + $envelope2 = (new Envelope(new \stdClass()))->with(new DelayStamp(10_000)); + $envelope2 = $this->transport->send($envelope2); + $this->assertSame([$envelope1], $this->transport->get()); + } + public function testQueueWithSerialization() { $envelope = new Envelope(new \stdClass()); diff --git a/src/Symfony/Component/Messenger/Transport/InMemory/InMemoryTransport.php b/src/Symfony/Component/Messenger/Transport/InMemory/InMemoryTransport.php index 13a97667b548b..4937c4b325786 100644 --- a/src/Symfony/Component/Messenger/Transport/InMemory/InMemoryTransport.php +++ b/src/Symfony/Component/Messenger/Transport/InMemory/InMemoryTransport.php @@ -11,8 +11,10 @@ namespace Symfony\Component\Messenger\Transport\InMemory; +use Psr\Clock\ClockInterface; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\LogicException; +use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\TransportInterface; @@ -46,16 +48,25 @@ class InMemoryTransport implements TransportInterface, ResetInterface private array $queue = []; private int $nextId = 1; - private ?SerializerInterface $serializer; + private array $availableAt = []; - public function __construct(SerializerInterface $serializer = null) - { - $this->serializer = $serializer; + public function __construct( + private ?SerializerInterface $serializer = null, + private ?ClockInterface $clock = null, + ) { } public function get(): iterable { - return array_values($this->decode($this->queue)); + $envelopes = []; + $now = $this->clock?->now() ?? new \DateTimeImmutable(); + foreach ($this->decode($this->queue) as $id => $envelope) { + if (!isset($this->availableAt[$id]) || $now > $this->availableAt[$id]) { + $envelopes[] = $envelope; + } + } + + return $envelopes; } public function ack(Envelope $envelope): void @@ -66,7 +77,7 @@ public function ack(Envelope $envelope): void throw new LogicException('No TransportMessageIdStamp found on the Envelope.'); } - unset($this->queue[$transportMessageIdStamp->getId()]); + unset($this->queue[$id = $transportMessageIdStamp->getId()], $this->availableAt[$id]); } public function reject(Envelope $envelope): void @@ -77,7 +88,7 @@ public function reject(Envelope $envelope): void throw new LogicException('No TransportMessageIdStamp found on the Envelope.'); } - unset($this->queue[$transportMessageIdStamp->getId()]); + unset($this->queue[$id = $transportMessageIdStamp->getId()], $this->availableAt[$id]); } public function send(Envelope $envelope): Envelope @@ -88,6 +99,12 @@ public function send(Envelope $envelope): Envelope $this->sent[] = $encodedEnvelope; $this->queue[$id] = $encodedEnvelope; + /** @var DelayStamp|null $delayStamp */ + if ($delayStamp = $envelope->last(DelayStamp::class)) { + $now = $this->clock?->now() ?? new \DateTimeImmutable(); + $this->availableAt[$id] = $now->modify(sprintf('+%d seconds', $delayStamp->getDelay() / 1000)); + } + return $envelope; }