10000 [Messenger] [AMQP] Add TransportMessageIdStamp logic for AMQP · symfony/symfony@4bcb380 · GitHub
[go: up one dir, main page]

Skip to content

Commit 4bcb380

Browse files
AurelienPillevessefabpot
authored andcommitted
[Messenger] [AMQP] Add TransportMessageIdStamp logic for AMQP
1 parent 8d4a538 commit 4bcb380

File tree

4 files changed

+127
-1
lines changed

4 files changed

+127
-1
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818
use Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection;
1919
use Symfony\Component\Messenger\Envelope;
2020
use Symfony\Component\Messenger\Exception\TransportException;
21+
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
2122
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
2223
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2324
use Symfony\Component\Serializer as SerializerComponent;
2425
use Symfony\Component\Serializer\Encoder\JsonEncoder;
26+
use Symfony\Component\Serializer\Normalizer\ArrayDenormalizer;
27+
use Symfony\Component\Serializer\Normalizer\DateTimeNormalizer;
2528
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
2629

2730
/**
@@ -74,13 +77,80 @@ public function testItThrowsATransportExceptionIfItCannotRejectMessage()
7477
$receiver->reject(new Envelope(new \stdClass(), [new AmqpReceivedStamp($amqpEnvelope, 'queueName')]));
7578
}
7679

77-
private function createAMQPEnvelope(): \AMQPEnvelope
80+
public function testTransportMessageIdStampIsCreatedWhenMessageIdIsSet()
81+
{
82+
$serializer = new Serializer(
83+
new SerializerComponent\Serializer([new DateTimeNormalizer(), new ArrayDenormalizer(), new ObjectNormalizer()], ['json' => new JsonEncoder()])
84+
);
85+
86+
$id = '01946fcb-4bcb-7aa7-9727-dac1c0374443';
87+
$amqpEnvelope = $this->createAMQPEnvelope($id);
88+
89+
$connection = $this->createMock(Connection::class);
90+
$connection->method('getQueueNames')->willReturn(['queueName']);
91+
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
92+
93+
$receiver = new AmqpReceiver($connection, $serializer);
94+
$actualEnvelopes = iterator_to_array($receiver->get());
95+
$this->assertCount(1, $actualEnvelopes);
96+
97+
/** @var Envelope $actualEnvelope */
98+
$actualEnvelope = $actualEnvelopes[0];
99+
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage());
100+
101+
/** @var AmqpReceivedStamp $amqpReceivedStamp */
102+
$amqpReceivedStamp = $actualEnvelope->last(AmqpReceivedStamp::class);
103+
$this->assertNotNull($amqpReceivedStamp);
104+
$this->assertSame($amqpEnvelope->getBody(), $amqpReceivedStamp->getAmqpEnvelope()->getBody());
105+
$this->assertSame($amqpEnvelope->getHeaders(), $amqpReceivedStamp->getAmqpEnvelope()->getHeaders());
106+
$this->assertSame($amqpEnvelope->getMessageId(), $amqpReceivedStamp->getAmqpEnvelope()->getMessageId());
107+
108+
/** @var TransportMessageIdStamp $transportMessageIdStamp */
109+
$transportMessageIdStamp = $actualEnvelope->last(TransportMessageIdStamp::class);
110+
$this->assertNotNull($transportMessageIdStamp);
111+
$this->assertSame($id, $transportMessageIdStamp->getId());
112+
}
113+
114+
public function testTransportMessageIdStampIsNotCreatedWhenMessageIdIsNotSet()
115+
{
116+
$serializer = new Serializer(
117+
new SerializerComponent\Serializer([new DateTimeNormalizer(), new ArrayDenormalizer(), new ObjectNormalizer()], ['json' => new JsonEncoder()])
118+
);
119+
120+
$amqpEnvelope = $this->createAMQPEnvelope();
121+
122+
$connection = $this->createMock(Connection::class);
123+
$connection->method('getQueueNames')->willReturn(['queueName']);
124+
$connection->method('get')->with('queueName')->willReturn($amqpEnvelope);
125+
126+
$receiver = new AmqpReceiver($connection, $serializer);
127+
$actualEnvelopes = iterator_to_array($receiver->get());
128+
$this->assertCount(1, $actualEnvelopes);
129+
130+
/** @var Envelope $actualEnvelope */
131+
$actualEnvelope = $actualEnvelopes[0];
132+
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage());
133+
134+
/** @var AmqpReceivedStamp $amqpReceivedStamp */
135+
$amqpReceivedStamp = $actualEnvelope->last(AmqpReceivedStamp::class);
136+
$this->assertNotNull($amqpReceivedStamp);
137+
$this->assertSame($amqpEnvelope->getBody(), $amqpReceivedStamp->getAmqpEnvelope()->getBody());
138+
$this->assertSame($amqpEnvelope->getHeaders(), $amqpReceivedStamp->getAmqpEnvelope()->getHeaders());
139+
$this->assertSame($amqpEnvelope->getMessageId(), $amqpReceivedStamp->getAmqpEnvelope()->getMessageId());
140+
141+
/** @var TransportMessageIdStamp $transportMessageIdStamp */
142+
$transportMessageIdStamp = $actualEnvelope->last(TransportMessageIdStamp::class);
143+
$this->assertNull($transportMessageIdStamp);
144+
}
145+
146+
private function createAMQPEnvelope(?string $messageId = null): \AMQPEnvelope
78147
{
79148
$envelope = $this->createMock(\AMQPEnvelope::class);
80149
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
81150
$envelope->method('getHeaders')->willReturn([
82151
'type' => DummyMessage::class,
83152
]);
153+
$envelope->method('getMessageId')->willReturn($messageId);
84154

85155
return $envelope;
86156
}

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpSenderTest.php

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection;
1919
use Symfony\Component\Messenger\Envelope;
2020
use Symfony\Component\Messenger\Exception\TransportException;
21+
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
2122
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2223

2324
/**
@@ -118,4 +119,47 @@ public function testItThrowsATransportExceptionIfItCannotSendTheMessage()
118119
$sender = new AmqpSender($connection, $serializer);
119120
$sender->send($envelope);
120121
}
122+
123+
public function testTransportMessageIdStampIsCreatedIfMessageIdIsSet()
124+
{
125+
$id = '01946fcb-4bcb-7aa7-9727-dac1c0374443';
126+
$stamp = new AmqpStamp(null, \AMQP_NOPARAM, ['message_id' => $id]);
127+
128+
$envelope = (new Envelope(new DummyMessage('Oy')))->with($stamp);
129+
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
130+
131+
$serializer = $this->createMock(SerializerInterface::class);
132+
$serializer->method('encode')->with($envelope)->willReturn($encoded);
133+
134+
$connection = $this->createMock(Connection::class);
135+
136+
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, $stamp);
137+
138+
$sender = new AmqpSender($connection, $serializer);
139+
$returnedEnvelope = $sender->send($envelope);
140+
141+
$transportMessageIdStamp = $returnedEnvelope->last(TransportMessageIdStamp::class);
142+
$this->assertSame($id, $transportMessageIdStamp->getId());
143+
}
144+
145+
public function testTransportMessageIdStampIsNotCreatedIfMessageIdIsNotSet()
146+
{
147+
$stamp = new AmqpStamp(null, \AMQP_NOPARAM, []);
148+
149+
$envelope = (new Envelope(new DummyMessage('Oy')))->with($stamp);
150+
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
151+
152+
$serializer = $this->createMock(SerializerInterface::class);
153+
$serializer->method('encode')->with($envelope)->willReturn($encoded);
154+
155+
$connection = $this->createMock(Connection::class);
156+
157+
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, $stamp);
158+
159+
$sender = new AmqpSender($connection, $serializer);
160+
$returnedEnvelope = $sender->send($envelope);
161+
162+
$transportMessageIdStamp = $returnedEnvelope->last(TransportMessageIdStamp::class);
163+
$this->assertNull($transportMessageIdStamp);
164+
}
121165
}

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Symfony\Component\Messenger\Exception\LogicException;
1616
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
1717
use Symfony\Component\Messenger\Exception\TransportException;
18+
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
1819
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1920
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
2021
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@@ -84,6 +85,12 @@ private function getEnvelope(string $queueName): iterable
8485
throw $exception;
8586
}
8687

88+
if (null !== $amqpEnvelope->getMessageId()) {
89+
$envelope = $envelope
90+
->withoutAll(TransportMessageIdStamp::class)
91+
->with(new TransportMessageIdStamp($amqpEnvelope->getMessageId()));
92+
}
93+
8794
yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName));
8895
}
8996

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Symfony\Component\Messenger\Exception\TransportException;
1616
use Symfony\Component\Messenger\Stamp\DelayStamp;
1717
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
18+
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
1819
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
1920
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2021
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -54,6 +55,10 @@ public function send(Envelope $envelope): Envelope
5455
}
5556
}
5657

58+
if ($amqpStamp instanceof AmqpStamp && isset($amqpStamp->getAttributes()['message_id'])) {
59+
$envelope = $envelope->with(new TransportMessageIdStamp($amqpStamp->getAttributes()['message_id']));
60+
}
61+
5762
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
5863
if ($amqpReceivedStamp instanceof AmqpReceivedStamp) {
5964
$amqpStamp = AmqpStamp::createFromAmqpEnvelope(

0 commit comments

Comments
 (0)
0