From 281540e005b68e2b5044df2d2e451da4a4055a4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Zaj=C4=85c?= Date: Mon, 17 Aug 2020 22:31:42 +0200 Subject: [PATCH] [Messenger] Add message timestamp to amqp connection --- .../Amqp/Tests/Transport/ConnectionTest.php | 20 ++++++++++--------- .../Bridge/Amqp/Transport/Connection.php | 1 + 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php index 81b8e45d858f9..103d3d0c61a97 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php @@ -21,6 +21,8 @@ /** * @requires extension amqp + * + * @group time-sensitive */ class ConnectionTest extends TestCase { @@ -266,7 +268,7 @@ public function testItSetupsTheConnectionWithDefaults() ); $amqpExchange->expects($this->once())->method('declareExchange'); - $amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]); + $amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); $amqpQueue->expects($this->once())->method('declareQueue'); $amqpQueue->expects($this->once())->method('bind')->with(self::DEFAULT_EXCHANGE_NAME, null); @@ -289,7 +291,7 @@ public function testItSetupsTheConnection() $factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1)); $amqpExchange->expects($this->once())->method('declareExchange'); - $amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]); + $amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); $amqpQueue0->expects($this->once())->method('declareQueue'); $amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive( [self::DEFAULT_EXCHANGE_NAME, 'binding_key0'], @@ -326,7 +328,7 @@ public function testBindingArguments() $factory->method('createQueue')->willReturn($amqpQueue); $amqpExchange->expects($this->once())->method('declareExchange'); - $amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]); + $amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); $amqpQueue->expects($this->once())->method('declareQueue'); $amqpQueue->expects($this->exactly(1))->method('bind')->withConsecutive( [self::DEFAULT_EXCHANGE_NAME, null, ['x-match' => 'all']] @@ -439,7 +441,7 @@ public function testItDelaysTheMessage() $delayQueue->expects($this->once())->method('declareQueue'); $delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__5000'); - $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo'], 'delivery_mode' => 2]); + $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo'], 'delivery_mode' => 2, 'timestamp' => time()]); $connection = Connection::fromDsn('amqp://localhost', [], $factory); $connection->publish('{}', ['x-some-headers' => 'foo'], 5000); @@ -481,7 +483,7 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs() $delayQueue->expects($this->once())->method('declareQueue'); $delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000'); - $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]); + $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); $connection->publish('{}', [], 120000); } @@ -513,7 +515,7 @@ public function testAmqpStampHeadersAreUsed() $amqpExchange = $this->createMock(\AMQPExchange::class) ); - $amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y'], 'delivery_mode' => 2]); + $amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y'], 'delivery_mode' => 2, 'timestamp' => time()]); $connection = Connection::fromDsn('amqp://localhost', [], $factory); $connection->publish('body', ['Foo' => 'X'], 0, new AmqpStamp(null, AMQP_NOPARAM, ['headers' => ['Bar' => 'Y']])); @@ -528,7 +530,7 @@ public function testAmqpStampDelireryModeIsUsed() $amqpExchange = $this->createMock(\AMQPExchange::class) ); - $amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 1]); + $amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 1, 'timestamp' => time()]); $connection = Connection::fromDsn('amqp://localhost', [], $factory); $connection->publish('body', [], 0, new AmqpStamp(null, AMQP_NOPARAM, ['delivery_mode' => 1])); @@ -600,7 +602,7 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument $delayQueue->expects($this->once())->method('declareQueue'); $delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000'); - $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]); + $delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]); $connection->publish('{}', [], 120000, new AmqpStamp('routing_key')); } @@ -617,7 +619,7 @@ public function testItCanPublishWithCustomFlagsAndAttributes() 'body', 'routing_key', AMQP_IMMEDIATE, - ['delivery_mode' => 2, 'headers' => ['type' => DummyMessage::class]] + ['delivery_mode' => 2, 'headers' => ['type' => DummyMessage::class], 'timestamp' => time()] ); $connection = Connection::fromDsn('amqp://localhost', [], $factory); diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 97d2f7672f3b8..7b9f49fa3a9a0 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -317,6 +317,7 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string $attributes = $amqpStamp ? $amqpStamp->getAttributes() : []; $attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers); $attributes['delivery_mode'] = $attributes['delivery_mode'] ?? 2; + $attributes['timestamp'] = $attributes['timestamp'] ?? time(); $exchange->publish( $body,