8000 feature #37867 [Messenger] Add message timestamp to amqp connection (… · symfony/symfony@e7dfa34 · GitHub
[go: up one dir, main page]

Skip to content

Commit e7dfa34

Browse files
committed
feature #37867 [Messenger] Add message timestamp to amqp connection (Bartłomiej Zając)
This PR was merged into the 5.2-dev branch. Discussion ---------- [Messenger] Add message timestamp to amqp connection | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | Deprecations? | no | Tickets | none | License | MIT | Doc PR | none Add default timestamp to amqp transport message. It is useful when you logging each message, and your consumer is down, you alway get right message time when start consume again. Commits ------- 281540e [Messenger] Add message timestamp to amqp connection
2 parents 09ff501 + 281540e commit e7dfa34

File tree

2 files changed

+12
-9
lines changed

2 files changed

+12
-9
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
/**
2323
* @requires extension amqp
24+
*
25+
* @group time-sensitive
2426
*/
2527
class ConnectionTest extends TestCase
2628
{
@@ -266,7 +268,7 @@ public function testItSetupsTheConnectionWithDefaults()
266268
);
267269

268270
$amqpExchange->expects($this->once())->method('declareExchange');
269-
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
271+
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
270272
$amqpQueue->expects($this->once())->method('declareQueue');
271273
$amqpQueue->expects($this->once())->method('bind')->with(self::DEFAULT_EXCHANGE_NAME, null);
272274

@@ -289,7 +291,7 @@ public function testItSetupsTheConnection()
289291
$factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1));
290292

291293
$amqpExchange->expects($this->once())->method('declareExchange');
292-
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
294+
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
293295
$amqpQueue0->expects($this->once())->method('declareQueue');
294296
$amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive(
295297
[self::DEFAULT_EXCHANGE_NAME, 'binding_key0'],
@@ -326,7 +328,7 @@ public function testBindingArguments()
326328
$factory->method('createQueue')->willReturn($amqpQueue);
327329

328330
$amqpExchange->expects($this->once())->method('declareExchange');
329-
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
331+
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
330332
$amqpQueue->expects($this->once())->method('declareQueue');
331333
$amqpQueue->expects($this->exactly(1))->method('bind')->withConsecutive(
332334
[self::DEFAULT_EXCHANGE_NAME, null, ['x-match' => 'all']]
@@ -439,7 +441,7 @@ public function testItDelaysTheMessage()
439441
$delayQueue->expects($this->once())->method('declareQueue');
440442
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__5000');
441443

442-
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo'], 'delivery_mode' => 2]);
444+
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo'], 'delivery_mode' => 2, 'timestamp' => time()]);
443445

444446
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
445447
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
@@ -481,7 +483,7 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
481483
$delayQueue->expects($this->once())->method('declareQueue');
482484
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000');
483485

484-
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
486+
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
485487
$connection->publish('{}', [], 120000);
486488
}
487489

@@ -513,7 +515,7 @@ public function testAmqpStampHeadersAreUsed()
513515
$amqpExchange = $this->createMock(\AMQPExchange::class)
514516
);
515517

516-
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y'], 'delivery_mode' => 2]);
518+
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y'], 'delivery_mode' => 2, 'timestamp' => time()]);
517519

518520
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
519521
$connection->publish('body', ['Foo' => 'X'], 0, new AmqpStamp(null, AMQP_NOPARAM, ['headers' => ['Bar' => 'Y']]));
@@ -528,7 +530,7 @@ public function testAmqpStampDelireryModeIsUse 6D40 d()
528530
$amqpExchange = $this->createMock(\AMQPExchange::class)
529531
);
530532

531-
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 1]);
533+
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 1, 'timestamp' => time()]);
532534

533535
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
534536
$connection->publish('body', [], 0, new AmqpStamp(null, AMQP_NOPARAM, ['delivery_mode' => 1]));
@@ -600,7 +602,7 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
600602
$delayQueue->expects($this->once())->method('declareQueue');
601603
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000');
602604

603-
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
605+
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
604606
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
605607
}
606608

@@ -617,7 +619,7 @@ public function testItCanPublishWithCustomFlagsAndAttributes()
617619
'body',
618620
'routing_key',
619621
AMQP_IMMEDIATE,
620-
['delivery_mode' => 2, 'headers' => ['type' => DummyMessage::class]]
622+
['delivery_mode' => 2, 'headers' => ['type' => DummyMessage::class], 'timestamp' => time()]
621623
);
622624

623625
$connection = Connection::fromDsn('amqp://localhost', [], $factory);

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string
317317
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
318318
$attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers);
319319
$attributes['delivery_mode'] = $attributes['delivery_mode'] ?? 2;
320+
$attributes['timestamp'] = $attributes['timestamp'] ?? time();
320321

321322
$exchange->publish(
322323
$body,

0 commit comments

Comments
 (0)
0