8000 feature #36864 [Messenger] Ability to distinguish retry and delay act… · symfony/symfony@649d115 · GitHub
[go: up one dir, main page]

Skip to content

Commit 649d115

Browse files
committed
feature #36864 [Messenger] Ability to distinguish retry and delay actions (theravel)
This PR was squashed before being merged into the 5.3-dev branch. Discussion ---------- [Messenger] Ability to distinguish retry and delay actions Added ability to distinguish retry and delay actions so that different "x-dead-letter-exchange" exchange name will be used in different scenarios. | Q | A | ------------- | --- | Branch? | 5.x | Bug fix? | no | New feature? | yes | Deprecations? | no | Tickets | - | License | MIT | Doc PR | This is a bug which existed since v4.4. The following scenario is possible: - There are two queues: `A` and `B`, both are bound to the same routing key via "topic" exchange (two different applications for example). - A message is published to this routing key to "topic" exchange. - Consumer of queue `A` handles it correctly and acknowledges the message. - Consumer of queue `B` throws and exception and message goes to retry (for example to queue `delay_delays_key_5`). - Once message expired in `delay_delays_key_5`, it is delivered again to both `A` and `B` (**again** consumed by consumer `A`). Expected: behavior of consumer `B` should not cause message duplication to queue `A`. It is required to make a change of name of temporary delay queue (otherwise "delay" and "retry" queues have incompatible declaration arguments). I left `queue_name_pattern` as is to keep settings of connection backward compatible, but changed internals of queue name construction. Commits ------- 417aaab [Messenger] Ability to distinguish retry and delay actions
2 parents 33bfb3d + 417aaab commit 649d115

File tree

6 files changed

+149
-50
lines changed

6 files changed

+149
-50
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ CHANGELOG
66

77
* Deprecated the `prefetch_count` parameter, it has no effect and will be removed in Symfony 6.0.
88
* `AmqpReceiver` implements `QueueReceiverInterface` to fetch messages from a specific set of queues.
9+
* Add ability to distinguish retry and delay actions
910

1011
5.2.0
1112
-----

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,45 @@ public function testRetryAndDelay()
140140
$receiver->ack($envelope);
141141
}
142142

143+
public function testRetryAffectsOnlyOriginalQueue()
144+
{
145+
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), [
146+
'exchange' => [
147+
'name' => 'messages_topic',
148+
'type' => 'topic',
149+
'default_publish_routing_key' => 'topic_routing_key',
150+
],
151+
'queues' => [
152+
'A' => ['binding_keys' => ['topic_routing_key']],
153+
'B' => ['binding_keys' => ['topic_routing_key']],
154+
],
155+
]);
156+
$connection->setup();
157+
$connection->purgeQueues();
158+
159+
$serializer = $this->createSerializer();
160+
$sender = new AmqpSender($connection, $serializer);
161+
$receiver = new AmqpReceiver($connection, $serializer);
162+
163+
// initial delivery: should receive in both queues
164+
$sender->send(new Envelope(new DummyMessage('Payload')));
165+
166+
$receivedEnvelopes = $this->receiveWithQueueName($receiver);
167+
$this->assertCount(2, $receivedEnvelopes);
168+
$this->assertArrayHasKey('A', $receivedEnvelopes);
169+
$this->assertArrayHasKey('B', $receivedEnvelopes);
170+
171+
// retry: should receive in only "A" queue
172+
$retryEnvelope = $receivedEnvelopes['A']
173+
->with(new DelayStamp(10))
174+
->with(new RedeliveryStamp(1));
175+
$sender->send($retryEnvelope);
176+
177+
$retriedEnvelopes = $this->receiveWithQueueName($receiver);
178+
$this->assertCount(1, $retriedEnvelopes);
179+
$this->assertArrayHasKey('A', $retriedEnvelopes);
180+
}
181+
143182
public function testItReceivesSignals()
144183
{
145184
$serializer = $this->createSerializer();
@@ -255,4 +294,19 @@ private function receiveEnvelopes(ReceiverInterface $receiver, int $timeout): ar
255294

256295
return $envelopes;
257296
}
297+
298+
private function receiveWithQueueName(AmqpReceiver $receiver)
299+
{
300+
// let RabbitMQ receive messages
301+
usleep(100 * 1000); // 100ms
302+
303+
$receivedEnvelopes = [];
304+
foreach ($receiver->get() as $envelope) {
305+
$queueName = $envelope->last(AmqpReceivedStamp::class)->getQueueName();
306+
$receivedEnvelopes[$queueName] = $envelope;
307+
$receiver->ack($envelope);
308+
}
309+
310+
return $receivedEnvelopes;
311+
}
258312
}

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php

Lines changed: 58 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -493,36 +493,30 @@ public function testAutoSetupWithDelayDeclaresExchangeQueuesAndDelay()
493493

494494
public function testItDelaysTheMessage()
495495
{
496-
$amqpConnection = $this->createMock(\AMQPConnection::class);
497-
$amqpChannel = $this->createMock(\AMQPChannel::class);
498-
499-
$factory = $this->createMock(AmqpFactory::class);
500-
$f 10000 actory->method('createConnection')->willReturn($amqpConnection);
501-
$factory->method('createChannel')->willReturn($amqpChannel);
502-
$factory->method('createQueue')->will($this->onConsecutiveCalls(
503-
$this->createMock(\AMQPQueue::class),
504-
$delayQueue = $this->createMock(\AMQPQueue::class)
505-
));
506-
$factory->method('createExchange')->will($this->onConsecutiveCalls(
507-
$this->createMock(\AMQPExchange::class),
508-
$delayExchange = $this->createMock(\AMQPExchange::class)
509-
));
496+
$delayExchange = $this->createMock(\AMQPExchange::class);
497+
$delayExchange->expects($this->once())
498+
->method('publish')
499+
->with('{}', 'delay_messages__5000_delay', AMQP_NOPARAM, [
500+
'headers' => ['x-some-headers' => 'foo'],
501+
'delivery_mode' => 2,
502+
'timestamp' => time(),
503+
]);
504+
$connection = $this->createDelayOrRetryConnection($delayExchange, self::DEFAULT_EXCHANGE_NAME, 'delay_messages__5000_delay');
510505

511-
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__5000');
512-
$delayQueue->expects($this->once())->method('setArguments')->with([
513-
'x-message-ttl' => 5000,
514-
'x-expires' => 5000 + 10000,
515-
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
516-
'x-dead-letter-routing-key' => '',
517-
]);
518-
519-
$delayQueue->expects($this->once())->method('declareQueue');
520-
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__5000');
506+
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
507+
}
521508

522-
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', \AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo'], 'delivery_mode' => 2, 'timestamp' => time()]);
509+
public function testItRetriesTheMessage()
510+
{
511+
$delayExchange = $this->createMock(\AMQPExchange::class);
512+
$delayExchange->expects($this->once())
513+
->method('publish')
514+
->with('{}', 'delay_messages__5000_retry', AMQP_NOPARAM);
515+
$connection = $this->createDelayOrRetryConnection($delayExchange, '', 'delay_messages__5000_retry');
523516

524-
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
525-
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
517+
$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
518+
$amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpEnvelope, null, '');
519+
$connection->publish('{}', [], 5000, $amqpStamp);
526520
}
527521

528522
public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
@@ -550,7 +544,7 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
550544

551545
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);
552546

553-
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__120000');
547+
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__120000_delay');
554548
$delayQueue->expects($this->once())->method('setArguments')->with([
555549
'x-message-ttl' => 120000,
556550
'x-expires' => 120000 + 10000,
@@ -559,9 +553,9 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
559553
]);
560554

561555
$delayQueue->expects($this->once())->method('declareQueue');
562-
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000');
556+
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000_delay');
563557

564-
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
558+
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000_delay', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
565559
$connection->publish('{}', [], 120000);
566560
}
567561

@@ -690,7 +684,7 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
690684

691685
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);
692686

693-
$delayQueue->expects($this->once())->method('setName')->with('delay_messages_routing_key_120000');
687+
$delayQueue->expects($this->once())->method('setName')->with('delay_messages_routing_key_120000_delay');
694688
$delayQueue->expects($this->once())->method('setArguments')->with([
695689
'x-message-ttl' => 120000,
696690
'x-expires' => 120000 + 10000,
@@ -699,9 +693,9 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
699693
]);
700694

701695
$delayQueue->expects($this->once())->method('declareQueue');
702-
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000');
696+
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000_delay');
703697

704-
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
698+
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000_delay', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
705699
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
706700
}
707701

@@ -769,6 +763,37 @@ public function testItCanPublishAndWaitForConfirmation()
769763
$connection = Connection::fromDsn('amqp://localhost?confirm_timeout=0.5', [], $factory);
770764
$connection->publish('body');
771765
}
766+
767+
private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, string $deadLetterExchangeName, string $delayQueueName): Connection
768+
{
769+
$amqpConnection = $this->createMock(\AMQPConnection::class);
770+
$amqpChannel = $this->createMock(\AMQPChannel::class);
771+
772+
$factory = $this->createMock(AmqpFactory::class);
773+
$factory->method('createConnection')->willReturn($amqpConnection);
774+
$factory->method('createChannel')->willReturn($amqpChannel);
775+
$factory->method('createQueue')->will($this->onConsecutiveCalls(
776+
$this->createMock(\AMQPQueue::class),
777+
$delayQueue = $this->createMock(\AMQPQueue::class)
778+
));
779+
$factory->method('createExchange')->will($this->onConsecutiveCalls(
780+
$this->createMock(\AMQPExchange::class),
781+
$delayExchange
782+
));
783+
784+
$delayQueue->expects($this->once())->method('setName')->with($delayQueueName);
785+
$delayQueue->expects($this->once())->method('setArguments')->with([
786+
'x-message-ttl' => 5000,
787+
'x-expires' => 5000 + 10000,
788+
'x-dead-letter-exchange' => $deadLetterExchangeName,
789+
'x-dead-letter-routing-key' => '',
790+
]);
791+
792+
$delayQueue->expects($this->once())->method('declareQueue');
793+
$delayQueue->expec 10000 ts($this->once())->method('bind')->with('delays', $delayQueueName);
794+
795+
return Connection::fromDsn('amqp://localhost', [], $factory);
796+
}
772797
}
773798

774799
class TestAmqpFactory extends AmqpFactory

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Exception\TransportException;
1616
use Symfony\Component\Messenger\Stamp\DelayStamp;
17+
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
1718
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
1819
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1920
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -58,7 +59,11 @@ public function send(Envelope $envelope): Envelope
5859

5960
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
6061
if ($amqpReceivedStamp instanceof AmqpReceivedStamp) {
61-
$amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpReceivedStamp->getAmqpEnvelope(), $amqpStamp);
62+
$amqpStamp = AmqpStamp::createFromAmqpEnvelope(
63+
$amqpReceivedStamp->getAmqpEnvelope(),
64+
$amqpStamp,
65+
$envelope->last(RedeliveryStamp::class) ? $amqpReceivedStamp->getQueueName() : null
66+
);
6267
}
6368

6469
try {

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpStamp.php

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ final class AmqpStamp implements NonSendableStampInterface
2222
private $routingKey;
2323
private $flags;
2424
private $attributes;
25+
private $isRetryAttempt = false;
2526

2627
public function __construct(string $routingKey = null, int $flags = \AMQP_NOPARAM, array $attributes = [])
2728
{
@@ -45,7 +46,7 @@ public function getAttributes(): array
4546
return $this->attributes;
4647
}
4748

48-
public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $previousStamp = null): self
49+
public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $previousStamp = null, string $retryRoutingKey = null): self
4950
{
5051
$attr = $previousStamp->attributes ?? [];
5152

@@ -62,7 +63,19 @@ public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self
6263
$attr['type'] = $attr['type'] ?? $amqpEnvelope->getType();
6364
$attr['reply_to'] = $attr['reply_to'] ?? $amqpEnvelope->getReplyTo();
6465

65-
return new self($previousStamp->routingKey ?? $amqpEnvelope->getRoutingKey(), $previousStamp->flags ?? \AMQP_NOPARAM, $attr);
66+
if (null === $retryRoutingKey) {
67+
$stamp = new self($previousStamp->routingKey ?? $amqpEnvelope->getRoutingKey(), $previousStamp->flags ?? AMQP_NOPARAM, $attr);
68+
} else {
69+
$stamp = new self($retryRoutingKey, $previousStamp->flags ?? AMQP_NOPARAM, $attr);
70+
$stamp->isRetryAttempt = true;
71+
}
72+
73+
return $stamp;
74+
}
75+
76+
public function isRetryAttempt(): bool
77+
{
78+
return $this->isRetryAttempt;
6679
}
6780

6881
public static function createWithAttributes(array $attributes, self $previousStamp = null): self

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -323,13 +323,14 @@ public function countMessagesInQueues(): int
323323
private function publishWithDelay(string $body, array $headers, int $delay, AmqpStamp $amqpStamp = null)
324324
{
325325
$routingKey = $this->getRoutingKeyForMessage($amqpStamp);
326+
$isRetryAttempt = $amqpStamp ? $amqpStamp->isRetryAttempt() : false;
326327

327-
$this->setupDelay($delay, $routingKey);
328+
$this->setupDelay($delay, $routingKey, $isRetryAttempt);
328329

329330
$this->publishOnExchange(
330331
$this->getDelayExchange(),
331332
$body,
332-
$this->getRoutingKeyForDelay($delay, $routingKey),
333+
$this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt),
333334
$headers,
334335
$amqpStamp
335336
);
@@ -354,15 +355,15 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string
354355
}
355356
}
356357

357-
private function setupDelay(int $delay, ?string $routingKey)
358+
private function setupDelay(int $delay, ?string $routingKey, bool $isRetryAttempt)
358359
{
359360
if ($this->autoSetupDelayExchange) {
360361
$this->setupDelayExchange();
361362
}
362363

363-
$queue = $this->createDelayQueue($delay, $routingKey);
364+
$queue = $this->createDelayQueue($delay, $routingKey, $isRetryAttempt);
364365
$queue->declareQueue(); // the delay queue always need to be declared because the name is dynamic and cannot be declared in advance
365-
$queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay, $routingKey));
366+
$queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt));
366367
}
367368

368369
private function getDelayExchange(): \AMQPExchange
@@ -386,21 +387,19 @@ private function getDelayExchange(): \AMQPExchange
386387
* which is the original exchange, resulting on it being put back into
387388
* the original queue.
388389
*/
389-
private function createDelayQueue(int $delay, ?string $routingKey): \AMQPQueue
390+
private function createDelayQueue(int $delay, ?string $routingKey, bool $isRetryAttempt): \AMQPQueue
390391
{
391392
$queue = $this->amqpFactory->createQueue($this->channel());
392-
$queue->setName(str_replace(
393-
['%delay%', '%exchange_name%', '%routing_key%'],
394-
[$delay, $this->exchangeOptions['name'], $routingKey ?? ''],
395-
$this->connectionOptions['delay']['queue_name_pattern']
396-
));
393+
$queue->setName($this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt));
397394
$queue->setFlags(\AMQP_DURABLE);
398395
$queue->setArguments([
399396
'x-message-ttl' => $delay,
400397
// delete the delay queue 10 seconds after the message expires
401398
// publishing another message redeclares the queue which renews the lease
402399
'x-expires' => $delay + 10000,
403-
'x-dead-letter-exchange' => $this->exchangeOptions['name'],
400+
// message should be broadcasted to all consumers during delay, but to only one queue during retry
401+
// empty name is default direct exchange
402+
'x-dead-letter-exchange' => $isRetryAttempt ? '' : $this->exchangeOptions['name'],
404403
// after being released from to DLX, make sure the original routing key will be used
405404
// we must use an empty string instead of null for the argument to be picked up
406405
'x-dead-letter-routing-key' => $routingKey ?? '',
@@ -409,13 +408,15 @@ private function createDelayQueue(int $delay, ?string $routingKey): \AMQPQueue
409408
return $queue;
410409
}
411410

412-
private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): string
411+
private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey, bool $isRetryAttempt): string
413412
{
413+
$action = $isRetryAttempt ? '_retry' : '_delay';
414+
414415
return str_replace(
415416
['%delay%', '%exchange_name%', '%routing_key%'],
416417
[$delay, $this->exchangeOptions['name'], $finalRoutingKey ?? ''],
417418
$this->connectionOptions['delay']['queue_name_pattern']
418-
);
419+
).$action;
419420
}
420421

421422
/**

0 commit comments

Comments
 (0)
0