8000 bug #34134 [Messenger] fix retry of messages losing the routing key a… · symfony/symfony@e057a9c · GitHub
[go: up one dir, main page]

Skip to content

Commit e057a9c

Browse files
committed
bug #34134 [Messenger] fix retry of messages losing the routing key and properties (Tobion)
This PR was merged into the 4.3 branch. Discussion ---------- [Messenger] fix retry of messages losing the routing key and properties | Q | A | ------------- | --- | Branch? | 4.3 | Bug fix? | yes | New feature? | no <!-- please update src/**/CHANGELOG.md files --> | Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files --> | Tickets | Fix #32994 <!-- prefix each issue number with "Fix #", if any --> | License | MIT | Doc PR | Messages sent for retry in rabbitmq lost the routing key and properties like the priority. Now we read those original properties and sent the retry message with the same properties (unless those properties have already been set manually before). Commits ------- 75c674d [Messenger] fix retry of messages losing the routing key and properties
2 parents 142bddd + 75c674d commit e057a9c

File tree

4 files changed

+74
-7
lines changed

4 files changed

+74
-7
lines changed

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpStampTest.php

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,40 @@ public function testFlagsAndAttributes()
3434
$this->assertSame(AMQP_DURABLE, $stamp->getFlags());
3535
$this->assertSame(['delivery_mode' => 'unknown'], $stamp->getAttributes());
3636
}
37+
38+
public function testCreateFromAmqpEnvelope()
39+
{
40+
$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
41+
$amqpEnvelope->method('getRoutingKey')->willReturn('routingkey');
42+
$amqpEnvelope->method('getDeliveryMode')->willReturn(2);
43+
$amqpEnvelope->method('getPriority')->willReturn(5);
44+
$amqpEnvelope->method('getAppId')->willReturn('appid');
45+
46+
$stamp = AmqpStamp::createFromAmqpEnvelope($amqpEnvelope);
47+
48+
$this->assertSame($amqpEnvelope->getRoutingKey(), $stamp->getRoutingKey());
49+
$this->assertSame($amqpEnvelope->getDeliveryMode(), $stamp->getAttributes()['delivery_mode']);
50+
$this->assertSame($amqpEnvelope->getPriority(), $stamp->getAttributes()['priority']);
51+
$this->assertSame($amqpEnvelope->getAppId(), $stamp->getAttributes()['app_id']);
52+
$this->assertSame(AMQP_NOPARAM, $stamp->getFlags());
53+
}
54+
55+
public function testCreateFromAmqpEnvelopeWithPreviousStamp()
56+
{
57+
$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
58+
$amqpEnvelope->method('getRoutingKey')->willReturn('routingkey');
59+
$amqpEnvelope->method('getDeliveryMode')->willReturn(2);
60+
$amqpEnvelope->method('getPriority')->willReturn(5);
61+
$amqpEnvelope->method('getAppId')->willReturn('appid');
62+
63+
$previousStamp = new AmqpStamp('otherroutingkey', AMQP_MANDATORY, ['priority' => 8]);
64+
65+
$stamp = AmqpStamp::createFromAmqpEnvelope($amqpEnvelope, $previousStamp);
66+
67+
$this->assertSame('otherroutingkey', $stamp->getRoutingKey());
68+
$this->assertSame($amqpEnvelope->getDeliveryMode(), $stamp->getAttributes()['delivery_mode']);
69+
$this->assertSame(8, $stamp->getAttributes()['priority']);
70+
$this->assertSame($amqpEnvelope->getAppId(), $stamp->getAttributes()['app_id']);
71+
$this->assertSame(AMQP_MANDATORY, $stamp->getFlags());
72+
}
3773
}

src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,20 +47,22 @@ public function send(Envelope $envelope): Envelope
4747
$delayStamp = $envelope->last(DelayStamp::class);
4848
$delay = $delayStamp ? $delayStamp->getDelay() : 0;
4949

50+
/** @var AmqpStamp|null $amqpStamp */
5051
$amqpStamp = $envelope->last(AmqpStamp::class);
5152
if (isset($encodedMessage['headers']['Content-Type'])) {
5253
$contentType = $encodedMessage['headers']['Content-Type'];
5354
unset($encodedMessage['headers']['Content-Type']);
5455

55-
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
56-
57-
if (!isset($attributes['content_type'])) {
58-
$attributes['content_type'] = $contentType;
59-
60-
$amqpStamp = new AmqpStamp($amqpStamp ? $amqpStamp->getRoutingKey() : null, $amqpStamp ? $amqpStamp->getFlags() : AMQP_NOPARAM, $attributes);
56+
if (!$amqpStamp || !isset($amqpStamp->getAttributes()['content_type'])) {
57+
$amqpStamp = AmqpStamp::createWithAttributes(['content_type' => $contentType], $amqpStamp);
6158
}
6259
}
6360

61+
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
62+
if ($amqpReceivedStamp instanceof AmqpReceivedStamp) {
63+
$amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpReceivedStamp->getAmqpEnvelope(), $amqpStamp);
64+
}
65+
6466
try {
6567
$this->connection->publish(
6668
$encodedMessage['body'],

src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpStamp.php

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,33 @@ public function getAttributes(): array
4646
{
4747
return $this->attributes;
4848
}
49+
50+
public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $previousStamp = null): self
51+
{
52+
$attr = $previousStamp->attributes ?? [];
53+
54+
$attr['headers'] = $attr['headers'] ?? $amqpEnvelope->getHeaders();
55+
$attr['content_type'] = $attr['content_type'] ?? $amqpEnvelope->getContentType();
56+
$attr['content_encoding'] = $attr['content_encoding'] ?? $amqpEnvelope->getContentEncoding();
57+
$attr['delivery_mode'] = $attr['delivery_mode'] ?? $amqpEnvelope->getDeliveryMode();
58+
$attr['priority'] = $attr['priority'] ?? $amqpEnvelope->getPriority();
59+
$attr['timestamp'] = $attr['timestamp'] ?? $amqpEnvelope->getTimestamp();
60+
$attr['app_id'] = $attr['app_id'] ?? $amqpEnvelope->getAppId();
61+
$attr['message_id'] = $attr['message_id'] ?? $amqpEnvelope->getMessageId();
62+
$attr['user_id'] = $attr['user_id'] ?? $amqpEnvelope->getUserId();
63+
$attr['expiration'] = $attr['expiration'] ?? $amqpEnvelope->getExpiration();
64+
$attr['type'] = $attr['type'] ?? $amqpEnvelope->getType();
65+
$attr['reply_to'] = $attr['reply_to'] ?? $amqpEnvelope->getReplyTo();
66+
67+
return new self($previousStamp->routingKey ?? $amqpEnvelope->getRoutingKey(), $previousStamp->flags ?? AMQP_NOPARAM, $attr);
68+
}
69+
70+
public static function createWithAttributes(array $attributes, self $previousStamp = null): self
71+
{
72+
return new self(
73+
$previousStamp->routingKey ?? null,
74+
$previousStamp->flags ?? AMQP_NOPARAM,
75+
array_merge($previousStamp->attributes ?? [], $attributes)
76+
);
77+
}
4978
}

src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ private function publishWithDelay(string $body, array $headers, int $delay, Amqp
225225
private function publishOnExchange(\AMQPExchange $exchange, string $body, string $routingKey = null, array $headers = [], AmqpStamp $amqpStamp = null)
226226
{
227227
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
228-
$attributes['headers'] = array_merge($headers, $attributes['headers'] ?? []);
228+
$attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers);
229229

230230
$exchange->publish(
231231
$body,

0 commit comments

Comments
 (0)
0