8000 [Messenger] expire delay queue and fix auto_setup logic by Tobion · Pull Request #32631 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[Messenger] expire delay queue and fix auto_setup logic #32631

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8000
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ public function testSetChannelPrefetchWhenSetup()
);

// makes sure the channel looks connected, so it's not re-created
$amqpChannel->expects($this->exactly(2))->method('isConnected')->willReturn(true);
$amqpChannel->expects($this->any())->method('isConnected')->willReturn(true);

$amqpChannel->expects($this->exactly(2))->method('setPrefetchCount')->with(2);
$connection = Connection::fromDsn('amqp://localhost?prefetch_count=2', [], $factory);
Expand All @@ -317,30 +317,57 @@ public function testSetChannelPrefetchWhenSetup()
$connection->setup();
}

public function testItDelaysTheMessage()
public function testAutoSetupWithDelayDeclaresExchangeQueuesAndDelay()
{
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);
$delayQueue = $this->createMock(\AMQPQueue::class);

$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->w 10000 illReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
$factory->method('createQueue')->will($this->onConsecutiveCalls(
$amqpQueue = $this->createMock(\AMQPQueue::class),
$delayQueue = $this->createMock(\AMQPQueue::class)
));
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
$amqpExchange = $this->createMock(\AMQPExchange::class),
$delayExchange = $this->createMock(\AMQPExchange::class)
));

$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
$amqpExchange->expects($this->once())->method('declareExchange');
$amqpQueue->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
$amqpQueue->expects($this->once())->method('declareQueue');

$delayExchange->expects($this->once())->method('setName')->with('delay');
$delayExchange->expects($this->once())->method('declareExchange');
$delayExchange->expects($this->once())->method('publish');

$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
}

public function testItDelaysTheMessage()
{
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);

$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->will($this->onConsecutiveCalls(
$this->createMock(\AMQPQueue::class),
$delayQueue = $this->createMock(\AMQPQueue::class)
));
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$this->createMock(\AMQPExchange::class),
$delayExchange = $this->createMock(\AMQPExchange::class)
));

$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__5000');
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__5000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 5000,
'x-expires' => 5000 + 10000,
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
'x-dead-letter-routing-key' => '',
]);
Expand All @@ -358,23 +385,19 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
{
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);
$delayQueue = $this->createMock(\AMQPQueue::class);

$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
$factory->method('createQueue')->will($this->onConsecutiveCalls(
$this->createMock(\AMQPQueue::class),
$delayQueue = $this->createMock(\AMQPQueue::class)
));
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
$this->createMock(\AMQPExchange::class),
$delayExchange = $this->createMock(\AMQPExchange::class)
));

$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
$amqpExchange->expects($this->once())->method('declareExchange');

$delayExchange->expects($this->once())->method('setName')->with('delay');
$delayExchange->expects($this->once())->method('declareExchange');

$connectionOptions = [
'retry' => [
'dead_routing_key' => 'my_dead_routing_key',
Expand All @@ -383,9 +406,10 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()

$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);

$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__120000');
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__120000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 120000,
'x-expires' => 120000 + 10000,
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
'x-dead-letter-routing-key' => '',
]);
Expand Down Expand Up @@ -467,23 +491,19 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
{
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);
$delayQueue = $this->createMock(\AMQPQueue::class);

$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
$factory->method('createQueue')->will($this->onConsecutiveCalls(
$this->createMock(\AMQPQueue::class),
$delayQueue = $this->createMock(\AMQPQueue::class)
));
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$amqpExchange = $this->createMock(\AMQPExchange::class),
$this->createMock(\AMQPExchange::class),
$delayExchange = $this->createMock(\AMQPExchange::class)
));

$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
$amqpExchange->expects($this->once())->method('declareExchange');

$delayExchange->expects($this->once())->method('setName')->with('delay');
$delayExchange->expects($this->once())->method('declareExchange');

$connectionOptions = [
'retry' => [
'dead_routing_key' => 'my_dead_routing_key',
Expand All @@ -492,9 +512,10 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument

$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);

$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages_routing_key_120000');
$delayQueue->expects($this->once())->method('setName')->with('delay_messages_routing_key_120000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 120000,
'x-expires' => 120000 + 10000,
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
'x-dead-letter-routing-key' => 'routing_key',
]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,7 @@ public function send(Envelope $envelope): Envelope

/** @var DelayStamp|null $delayStamp */
$delayStamp = $envelope->last(DelayStamp::class);
$delay = 0;
if (null !== $delayStamp) {
$delay = $delayStamp->getDelay();
}
$delay = $delayStamp ? $delayStamp->getDelay() : 0;

$amqpStamp = $envelope->last(AmqpStamp::class);
if (isset($encodedMessage['headers']['Content-Type'])) {
Expand Down
64 changes: 33 additions & 31 deletions src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
{
$this->connectionOptions = array_replace_recursive([
'delay' => [
'routing_key_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%',
'exchange_name' => 'delay',
'queue_name_pattern' => 'delay_queue_%exchange_name%_%routing_key%_%delay%',
'queue_name_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%',
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to change the name because of the new x-expire argument which would otherwise cause a precondition failed error on redeclaring. but the name is an implementation detail and should not matter.

],
], $connectionOptions);
$this->exchangeOptions = $exchangeOptions;
Expand Down Expand Up @@ -93,9 +92,8 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
* * flags: Exchange flags (Default: AMQP_DURABLE)
* * arguments: Extra arguments
* * delay:
* * routing_key_pattern: The pattern of the routing key (Default: "delay_%exchange_name%_%routing_key%_%delay%")
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%exchange_name%_%routing_key%_%delay%")
* * exchange_name: Name of the exchange to be used for the retried messages (Default: "delay")
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%delay%")
* * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delay")
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
* * prefetch_count: set channel prefetch count
*/
Expand Down Expand Up @@ -171,20 +169,20 @@ private static function normalizeQueueArguments(array $arguments): array
}

/**
* @param int $delay The delay in milliseconds
*
* @throws \AMQPException
*/
public function publish(string $body, array $headers = [], int $delay = 0, AmqpStamp $amqpStamp = null): void
public function publish(string $body, array $headers = [], int $delayInMs = 0, AmqpStamp $amqpStamp = null): void
{
if (0 !== $delay) {
$this->publishWithDelay($body, $headers, $delay, $amqpStamp);
$this->clearWhenDisconnected();

if (0 !== $delayInMs) {
$this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp);

return;
}

if ($this->shouldSetup()) {
$this->setup();
$this->setupExchangeAndQueues();
}

$this->publishOnExchange(
Expand Down Expand Up @@ -213,9 +211,7 @@ private function publishWithDelay(string $body, array $headers, int $delay, Amqp
{
$routingKey = $this->getRoutingKeyForMessage($amqpStamp);

if ($this->shouldSetup()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delay must ignore the auto_setup option as explained below

$this->setupDelay($delay, $routingKey);
}
$this->setupDelay($delay, $routingKey);

$this->publishOnExchange(
$this->getDelayExchange(),
Expand All @@ -241,15 +237,12 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string

private function setupDelay(int $delay, ?string $routingKey)
{
if (!$this->channel()->isConnected()) {
$this->clear();
if ($this->shouldSetup()) {
$this->setup(); // setup delay exchange and normal exchange for delay queue to DLX messages to
}

$this->exchange()->declareExchange(); // setup normal exchange for delay queue to DLX messages to
$this->getDelayExchange()->declareExchange();

$queue = $this->createDelayQueue($delay, $routingKey);
$queue->declareQueue();
$queue->declareQueue(); // the delay queue always need to be declared because the name is dynamic and cannot be declared in advance
$queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay, $routingKey));
}

Expand Down Expand Up @@ -283,6 +276,9 @@ private function createDelayQueue(int $delay, ?string $routingKey)
));
$queue->setArguments([
'x-message-ttl' => $delay,
// delete the delay queue 10 seconds after the message expires
// publishing another message redeclares the queue which renews the lease
'x-expires' => $delay + 10000,
'x-dead-letter-exchange' => $this->exchangeOptions['name'],
// after being released from to DLX, make sure the original routing key will be used
// we must use an empty string instead of null for the argument to be picked up
Expand All @@ -297,7 +293,7 @@ private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): st
return str_replace(
['%delay%', '%exchange_name%', '%routing_key%'],
[$delay, $this->exchangeOptions['name'], $finalRoutingKey ?? ''],
$this->connectionOptions['delay']['routing_key_pattern']
$this->connectionOptions['delay']['queue_name_pattern']
);
}

Expand All @@ -308,8 +304,10 @@ private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): st
*/
public function get(string $queueName): ?\AMQPEnvelope
{
$this->clearWhenDisconnected();

if ($this->shouldSetup()) {
$this->setup();
$this->setupExchangeAndQueues();
}

try {
Expand All @@ -319,7 +317,7 @@ public function get(string $queueName): ?\AMQPEnvelope
} catch (\AMQPQueueException $e) {
if (404 === $e->getCode() && $this->shouldSetup()) {
// If we get a 404 for the queue, it means we need to setup the exchange & queue.
$this->setup();
$this->setupExchangeAndQueues();

return $this->get();
}
Expand All @@ -342,10 +340,12 @@ public function nack(\AMQPEnvelope $message, string $queueName, int $flags = AMQ

public function setup(): void
{
if (!$this->channel()->isConnected()) {
$this->clear();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should also happen when auto_setup is not enabled to reconnect when necessary. so I moved it outside setup()

}
$this->setupExchangeAndQueues();
$this->getDelayExchange()->declareExchange();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setup is called from messenger:setup-transports which should also create the delay exchange. this way, auto_setup can actually be disabled successfully even when using delays/retries

}

private function setupExchangeAndQueues(): void
{
$this->exchange()->declareExchange();

foreach ($this->queuesOptions as $queueName => $queueConfig) {
Expand Down Expand Up @@ -424,12 +424,14 @@ public function exchange(): \AMQPExchange
return $this->amqpExchange;
}

private function clear(): void
private function clearWhenDisconnected(): void
{
$this->amqpChannel = null;
$this->amqpQueues = [];
$this->amqpExchange = null;
$this->amqpDelayExchange = null;
if (!$this->channel()->isConnected()) {
$this->amqpChannel = null;
$this->amqpQueues = [];
$this->amqpExchange = null;
$this->amqpDelayExchange = null;
}
}

private function shouldSetup(): bool
Expand Down
0