10000 [Messenger] Add message timestamp to amqp connection · symfony/symfony@9c3989b · GitHub
[go: up one dir, main page]

Skip to content

Commit 9c3989b

Browse files
author
Bartłomiej Zając
committed
[Messenger] Add message timestamp to amqp connection
1 parent e1cfbd2 commit 9c3989b

File tree

2 files changed

+13
-9
lines changed

2 files changed

+13
-9
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Bridge\Amqp\Tests\Transport;
1313

1414
use PHPUnit\Framework\TestCase;
15+
use Symfony\Bridge\PhpUnit\ClockMock;
1516
use Symfony\Bridge\PhpUnit\ExpectDeprecationTrait;
1617
use Symfony\Component\Messenger\Bridge\Amqp\Tests\Fixtures\DummyMessage;
1718
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpFactory;
@@ -21,6 +22,8 @@
2122

2223
/**
2324
* @requires extension amqp
25+
*
26+
* @group time-sensitive
2427
*/
2528
class ConnectionTest extends TestCase
2629
{
@@ -266,7 +269,7 @@ public function testItSetupsTheConnectionWithDefaults()
266269
);
267270

268271
$amqpExchange->expects($this->once())->method('declareExchange');
269-
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
272+
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
270273
$amqpQueue->expects($this->once())->method('declareQueue');
271274
$amqpQueue->expects($this->once())->method('bind')->with(self::DEFAULT_EXCHANGE_NAME, null);
272275

@@ -289,7 +292,7 @@ public function testItSetupsTheConnection()
289292
$factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1));
290293

291294
$amqpExchange->expects($this->once())->method('declareExchange');
292-
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
295+
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
293296
$amqpQueue0->expects($this->once())->method('declareQueue');
294297
$amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive(
295298
[self::DEFAULT_EXCHANGE_NAME, 'binding_key0'],
@@ -326,7 +329,7 @@ public function testBindingArguments()
326329
$factory->method('createQueue')->willReturn($amqpQueue);
327330

328331
$amqpExchange->expects($this->once())->method('declareExchange');
329-
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
332+
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
330333
$amqpQueue->expects($this->once())->method('declareQueue');
331334
$amqpQueue->expects($this->exactly(1))->method('bind')->withConsecutive(
332335
[self::DEFAULT_EXCHANGE_NAME, null, ['x-match' => 'all']]
@@ -439,7 +442,7 @@ public function testItDelaysTheMessage()
439442
$delayQueue->expects($this->once())->method('declareQueue');
440443
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__5000');
441444

442-
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo'], 'delivery_mode' => 2]);
445+
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo'], 'delivery_mode' => 2, 'timestamp' => time()]);
443446

444447
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
445448
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
@@ -481,7 +484,7 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
481484
$delayQueue->expects($this->once())->method('declareQueue');
482485
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000');
483486

484-
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
487+
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
485488
$connection->publish('{}', [], 120000);
486489
}
487490

@@ -513,7 +516,7 @@ public function testAmqpStampHeadersAreUsed()
513516
$amqpExchange = $this->createMock(\AMQPExchange::class)
514517
);
515518

516-
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y'], 'delivery_mode' => 2]);
519+
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y'], 'delivery_mode' => 2, 'timestamp' => time()]);
517520

518521
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
519522
$connection->publish('body', ['Foo' => 'X'], 0, new AmqpStamp(null, AMQP_NOPARAM, ['headers' => ['Bar' => 'Y']]));
@@ -528,7 +531,7 @@ public function testAmqpStampDelireryModeIsUsed()
528531
$amqpExchange = $this->createMock(\AMQPExchange::class)
529532
);
530533

531-
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 1]);
534+
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 1, 'timestamp' => time()]);
532535

533536
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
534537
$connection->publish('body', [], 0, new AmqpStamp(null, AMQP_NOPARAM, ['delivery_mode' => 1]));
@@ -600,7 +603,7 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
600603
$delayQueue->expects($this->once())->method('declareQueue');
601604
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000');
602605

603-
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
606+
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
604607
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
605608
}
606609

@@ -617,7 +620,7 @@ public function testItCanPublishWithCustomFlagsAndAttributes()
617620
'body',
618621
'routing_key',
619622
AMQP_IMMEDIATE,
620-
['delivery_mode' => 2, 'headers' => ['type' => DummyMessage::class]]
623+
['delivery_mode' => 2, 'headers' => ['type' => DummyMessage::class], 'timestamp' => time()]
621624
);
622625

623626
$connection = Connection::fromDsn('amqp://localhost', [], $factory);

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string
317317
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
318318
$attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers);
319319
$attributes['delivery_mode'] = $attributes['delivery_mode'] ?? 2;
320+
$attributes['timestamp'] = $attributes['timestamp'] ?? time();
320321

321322
$exchange->publish(
322323
$body,

0 commit comments

Comments
 (0)
0