diff --git a/Tests/Transport/DoctrineReceiverTest.php b/Tests/Transport/DoctrineReceiverTest.php index 744c76c..d81e7cd 100644 --- a/Tests/Transport/DoctrineReceiverTest.php +++ b/Tests/Transport/DoctrineReceiverTest.php @@ -26,6 +26,8 @@ use Symfony\Component\Messenger\Transport\Serialization\Serializer; use Symfony\Component\Serializer as SerializerComponent; use Symfony\Component\Serializer\Encoder\JsonEncoder; +use Symfony\Component\Serializer\Normalizer\ArrayDenormalizer; +use Symfony\Component\Serializer\Normalizer\DateTimeNormalizer; use Symfony\Component\Serializer\Normalizer\ObjectNormalizer; class DoctrineReceiverTest extends TestCase @@ -43,7 +45,7 @@ public function testItReturnsTheDecodedMessageToTheHandler() $this->assertCount(1, $actualEnvelopes); /** @var Envelope $actualEnvelope */ $actualEnvelope = $actualEnvelopes[0]; - $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage()); + $this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage()); /** @var DoctrineReceivedStamp $doctrineReceivedStamp */ $doctrineReceivedStamp = $actualEnvelope->last(DoctrineReceivedStamp::class); @@ -91,6 +93,23 @@ public function testOccursRetryableExceptionFromConnection() $receiver->get(); } + public function testGetReplacesExistingTransportMessageIdStamps() + { + $serializer = $this->createSerializer(); + + $doctrineEnvelope = $this->createRetriedDoctrineEnvelope(); + $connection = $this->createMock(Connection::class); + $connection->method('get')->willReturn($doctrineEnvelope); + + $receiver = new DoctrineReceiver($connection, $serializer); + $actualEnvelopes = $receiver->get(); + /** @var Envelope $actualEnvelope */ + $actualEnvelope = $actualEnvelopes[0]; + $messageIdStamps = $actualEnvelope->all(TransportMessageIdStamp::class); + + $this->assertCount(1, $messageIdStamps); + } + public function testAll() { $serializer = $this->createSerializer(); @@ -106,6 +125,24 @@ public function testAll() $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage()); } + public function testAllReplacesExistingTransportMessageIdStamps() + { + $serializer = $this->createSerializer(); + + $doctrineEnvelope1 = $this->createRetriedDoctrineEnvelope(); + $doctrineEnvelope2 = $this->createRetriedDoctrineEnvelope(); + $connection = $this->createMock(Connection::class); + $connection->method('findAll')->willReturn([$doctrineEnvelope1, $doctrineEnvelope2]); + + $receiver = new DoctrineReceiver($connection, $serializer); + $actualEnvelopes = $receiver->all(); + foreach ($actualEnvelopes as $actualEnvelope) { + $messageIdStamps = $actualEnvelope->all(TransportMessageIdStamp::class); + + $this->assertCount(1, $messageIdStamps); + } + } + public function testFind() { $serializer = $this->createSerializer(); @@ -119,6 +156,21 @@ public function testFind() $this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage()); } + public function testFindReplacesExistingTransportMessageIdStamps() + { + $serializer = $this->createSerializer(); + + $doctrineEnvelope = $this->createRetriedDoctrineEnvelope(); + $connection = $this->createMock(Connection::class); + $connection->method('find')->with(3)->willReturn($doctrineEnvelope); + + $receiver = new DoctrineReceiver($connection, $serializer); + $actualEnvelope = $receiver->find(3); + $messageIdStamps = $actualEnvelope->all(TransportMessageIdStamp::class); + + $this->assertCount(1, $messageIdStamps); + } + public function testAck() { $serializer = $this->createSerializer(); @@ -186,7 +238,7 @@ public function testAckThrowsRetryableExceptionAndRetriesFail() ->with('1') ->willThrowException($deadlockException); - self::expectException(TransportException::class); + $this->expectException(TransportException::class); $receiver->ack($envelope); } @@ -206,7 +258,7 @@ public function testAckThrowsException() ->with('1') ->willThrowException($exception); - self::expectException($exception::class); + $this->expectException($exception::class); $receiver->ack($envelope); } @@ -277,7 +329,7 @@ public function testRejectThrowsRetryableExceptionAndRetriesFail() ->with('1') ->willThrowException($deadlockException); - self::expectException(TransportException::class); + $this->expectException(TransportException::class); $receiver->reject($envelope); } @@ -297,7 +349,7 @@ public function testRejectThrowsException() ->with('1') ->willThrowException($exception); - self::expectException($exception::class); + $this->expectException($exception::class); $receiver->reject($envelope); } @@ -312,10 +364,27 @@ private function createDoctrineEnvelope(): array ]; } + private function createRetriedDoctrineEnvelope(): array + { + return [ + 'id' => 3, + 'body' => '{"message": "Hi"}', + 'headers' => [ + 'type' => DummyMessage::class, + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\BusNameStamp' => '[{"busName":"messenger.bus.default"}]', + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\TransportMessageIdStamp' => '[{"id":1},{"id":2}]', + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\ErrorDetailsStamp' => '[{"exceptionClass":"Symfony\\\\Component\\\\Messenger\\\\Exception\\\\RecoverableMessageHandlingException","exceptionCode":0,"exceptionMessage":"","flattenException":null}]', + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\DelayStamp' => '[{"delay":1000},{"delay":1000}]', + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\RedeliveryStamp' => '[{"retryCount":1,"redeliveredAt":"2025-01-05T13:58:25+00:00"},{"retryCount":2,"redeliveredAt":"2025-01-05T13:59:26+00:00"}]', + 'Content-Type' => 'application/json', + ], + ]; + } + private function createSerializer(): Serializer { return new Serializer( - new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + new SerializerComponent\Serializer([new DateTimeNormalizer(), new ArrayDenormalizer(), new ObjectNormalizer()], ['json' => new JsonEncoder()]) ); } } diff --git a/Transport/DoctrineReceiver.php b/Transport/DoctrineReceiver.php index e999a57..19cb065 100644 --- a/Transport/DoctrineReceiver.php +++ b/Transport/DoctrineReceiver.php @@ -141,10 +141,12 @@ private function createEnvelopeFromData(array $data): Envelope throw $exception; } - return $envelope->with( - new DoctrineReceivedStamp($data['id']), - new TransportMessageIdStamp($data['id']) - ); + return $envelope + ->withoutAll(TransportMessageIdStamp::class) + ->with( + new DoctrineReceivedStamp($data['id']), + new TransportMessageIdStamp($data['id']) + ); } private function withRetryableExceptionRetry(callable $callable): void