From 61646e2040e50603e4fe3411f50801a892fb3a2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Treffler?= Date: Sun, 5 Jan 2025 17:52:55 +0100 Subject: [PATCH 1/2] [Doctrine][Messenger] Prevents multiple TransportMessageIdStamp being stored in envelope - Multiple TransportMessageIdStamps can heavily increase message size on multiple retries, to prevent that when message is fetched existing TransportMessageIdStamps are discarded before new one is added. - Replaces static calls to \PHPUnit\Framework\TestCase::expectException in DoctrineReceiverTest test with instance calls. Fixes: https://github.com/symfony/symfony/issues/49637 --- Tests/Transport/DoctrineReceiverTest.php | 83 +++++++++++++++++++++--- Transport/DoctrineReceiver.php | 10 +-- 2 files changed, 81 insertions(+), 12 deletions(-) diff --git a/Tests/Transport/DoctrineReceiverTest.php b/Tests/Transport/DoctrineReceiverTest.php index 36ee145..7dcb190 100644 --- a/Tests/Transport/DoctrineReceiverTest.php +++ b/Tests/Transport/DoctrineReceiverTest.php @@ -28,6 +28,8 @@ use Symfony\Component\Messenger\Transport\Serialization\Serializer; use Symfony\Component\Serializer as SerializerComponent; use Symfony\Component\Serializer\Encoder\JsonEncoder; +use Symfony\Component\Serializer\Normalizer\ArrayDenormalizer; +use Symfony\Component\Serializer\Normalizer\DateTimeNormalizer; use Symfony\Component\Serializer\Normalizer\ObjectNormalizer; class DoctrineReceiverTest extends TestCase @@ -100,6 +102,23 @@ public function testOccursRetryableExceptionFromConnection() $receiver->get(); } + public function testGetReplacesExistingTransportMessageIdStamps() + { + $serializer = $this->createSerializer(); + + $doctrineEnvelope = $this->createRetriedDoctrineEnvelope(); + $connection = $this->createMock(Connection::class); + $connection->method('get')->willReturn($doctrineEnvelope); + + $receiver = new DoctrineReceiver($connection, $serializer); + $actualEnvelopes = $receiver->get(); + /** @var Envelope $actualEnvelope */ + $actualEnvelope = $actualEnvelopes[0]; + $messageIdStamps = $actualEnvelope->all(TransportMessageIdStamp::class); + + $this->assertCount(1, $messageIdStamps); + } + public function testAll() { $serializer = $this->createSerializer(); @@ -115,6 +134,24 @@ public function testAll() $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage()); } + public function testAllReplacesExistingTransportMessageIdStamps() + { + $serializer = $this->createSerializer(); + + $doctrineEnvelope1 = $this->createRetriedDoctrineEnvelope(); + $doctrineEnvelope2 = $this->createRetriedDoctrineEnvelope(); + $connection = $this->createMock(Connection::class); + $connection->method('findAll')->willReturn([$doctrineEnvelope1, $doctrineEnvelope2]); + + $receiver = new DoctrineReceiver($connection, $serializer); + $actualEnvelopes = $receiver->all(); + foreach ($actualEnvelopes as $actualEnvelope) { + $messageIdStamps = $actualEnvelope->all(TransportMessageIdStamp::class); + + $this->assertCount(1, $messageIdStamps); + } + } + public function testFind() { $serializer = $this->createSerializer(); @@ -128,6 +165,21 @@ public function testFind() $this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage()); } + public function testFindReplacesExistingTransportMessageIdStamps() + { + $serializer = $this->createSerializer(); + + $doctrineEnvelope = $this->createRetriedDoctrineEnvelope(); + $connection = $this->createMock(Connection::class); + $connection->method('find')->with(3)->willReturn($doctrineEnvelope); + + $receiver = new DoctrineReceiver($connection, $serializer); + $actualEnvelope = $receiver->find(3); + $messageIdStamps = $actualEnvelope->all(TransportMessageIdStamp::class); + + $this->assertCount(1, $messageIdStamps); + } + public function testAck() { $serializer = $this->createSerializer(); @@ -195,7 +247,7 @@ public function testAckThrowsRetryableExceptionAndRetriesFail() ->with('1') ->willThrowException($deadlockException); - self::expectException(TransportException::class); + $this->expectException(TransportException::class); $receiver->ack($envelope); } @@ -215,7 +267,7 @@ public function testAckThrowsException() ->with('1') ->willThrowException($exception); - self::expectException($exception::class); + $this->expectException($exception::class); $receiver->ack($envelope); } @@ -286,7 +338,7 @@ public function testRejectThrowsRetryableExceptionAndRetriesFail() ->with('1') ->willThrowException($deadlockException); - self::expectException(TransportException::class); + $this->expectException(TransportException::class); $receiver->reject($envelope); } @@ -306,7 +358,7 @@ public function testRejectThrowsException() ->with('1') ->willThrowException($exception); - self::expectException($exception::class); + $this->expectException($exception::class); $receiver->reject($envelope); } @@ -321,12 +373,27 @@ private function createDoctrineEnvelope(): array ]; } + private function createRetriedDoctrineEnvelope(): array + { + return [ + 'id' => 3, + 'body' => '{"message": "Hi"}', + 'headers' => [ + 'type' => DummyMessage::class, + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\BusNameStamp' => '[{"busName":"messenger.bus.default"}]', + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\TransportMessageIdStamp' => '[{"id":1},{"id":2}]', + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\ErrorDetailsStamp' => '[{"exceptionClass":"Symfony\\\\Component\\\\Messenger\\\\Exception\\\\RecoverableMessageHandlingException","exceptionCode":0,"exceptionMessage":"","flattenException":null}]', + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\DelayStamp' => '[{"delay":1000},{"delay":1000}]', + 'X-Message-Stamp-Symfony\Component\Messenger\Stamp\RedeliveryStamp' => '[{"retryCount":1,"redeliveredAt":"2025-01-05T13:58:25+00:00"},{"retryCount":2,"redeliveredAt":"2025-01-05T13:59:26+00:00"}]', + 'Content-Type' => 'application/json', + ], + ]; + } + private function createSerializer(): Serializer { - $serializer = new Serializer( - new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + return new Serializer( + new SerializerComponent\Serializer([new DateTimeNormalizer(), new ArrayDenormalizer(), new ObjectNormalizer()], ['json' => new JsonEncoder()]) ); - - return $serializer; } } diff --git a/Transport/DoctrineReceiver.php b/Transport/DoctrineReceiver.php index 20bd611..85bd607 100644 --- a/Transport/DoctrineReceiver.php +++ b/Transport/DoctrineReceiver.php @@ -141,10 +141,12 @@ private function createEnvelopeFromData(array $data): Envelope throw $exception; } - return $envelope->with( - new DoctrineReceivedStamp($data['id']), - new TransportMessageIdStamp($data['id']) - ); + return $envelope + ->withoutAll(TransportMessageIdStamp::class) + ->with( + new DoctrineReceivedStamp($data['id']), + new TransportMessageIdStamp($data['id']) + ); } private function withRetryableExceptionRetry(callable $callable): void From 5437d2cde244a4a4b657bc7f3aceb1ce6c56ea7f Mon Sep 17 00:00:00 2001 From: HypeMC Date: Fri, 3 Jan 2025 03:50:44 +0100 Subject: [PATCH 2/2] [Messenger] Fix `TransportMessageIdStamp` not always added --- Tests/Transport/DoctrineReceiverTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/Transport/DoctrineReceiverTest.php b/Tests/Transport/DoctrineReceiverTest.php index 7dcb190..fcf4d67 100644 --- a/Tests/Transport/DoctrineReceiverTest.php +++ b/Tests/Transport/DoctrineReceiverTest.php @@ -47,7 +47,7 @@ public function testItReturnsTheDecodedMessageToTheHandler() $this->assertCount(1, $actualEnvelopes); /** @var Envelope $actualEnvelope */ $actualEnvelope = $actualEnvelopes[0]; - $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage()); + $this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage()); /** @var DoctrineReceivedStamp $doctrineReceivedStamp */ $doctrineReceivedStamp = $actualEnvelope->last(DoctrineReceivedStamp::class);