@@ -493,36 +493,30 @@ public function testAutoSetupWithDelayDeclaresExchangeQueuesAndDelay()
493
493
494
494
public function testItDelaysTheMessage ()
495
495
{
496
- $ amqpConnection = $ this ->createMock (\AMQPConnection::class);
497
- $ amqpChannel = $ this ->createMock (\AMQPChannel::class);
498
-
499
- $ factory = $ this ->createMock (AmqpFactory::class);
500
- $ f
10000
actory ->method ('createConnection ' )->willReturn ($ amqpConnection );
501
- $ factory ->method ('createChannel ' )->willReturn ($ amqpChannel );
502
- $ factory ->method ('createQueue ' )->will ($ this ->onConsecutiveCalls (
503
- $ this ->createMock (\AMQPQueue::class),
504
- $ delayQueue = $ this ->createMock (\AMQPQueue::class)
505
- ));
506
- $ factory ->method ('createExchange ' )->will ($ this ->onConsecutiveCalls (
507
- $ this ->createMock (\AMQPExchange::class),
508
- $ delayExchange = $ this ->createMock (\AMQPExchange::class)
509
- ));
496
+ $ delayExchange = $ this ->createMock (\AMQPExchange::class);
497
+ $ delayExchange ->expects ($ this ->once ())
498
+ ->method ('publish ' )
499
+ ->with ('{} ' , 'delay_messages__5000_delay ' , AMQP_NOPARAM , [
500
+ 'headers ' => ['x-some-headers ' => 'foo ' ],
501
+ 'delivery_mode ' => 2 ,
502
+ 'timestamp ' => time (),
503
+ ]);
504
+ $ connection = $ this ->createDelayOrRetryConnection ($ delayExchange , self ::DEFAULT_EXCHANGE_NAME , 'delay_messages__5000_delay ' );
510
505
511
- $ delayQueue ->expects ($ this ->once ())->method ('setName ' )->with ('delay_messages__5000 ' );
512
- $ delayQueue ->expects ($ this ->once ())->method ('setArguments ' )->with ([
513
- 'x-message-ttl ' => 5000 ,
514
- 'x-expires ' => 5000 + 10000 ,
515
- 'x-dead-letter-exchange ' => self ::DEFAULT_EXCHANGE_NAME ,
516
- 'x-dead-letter-routing-key ' => '' ,
517
- ]);
518
-
519
- $ delayQueue ->expects ($ this ->once ())->method ('declareQueue ' );
520
- $ delayQueue ->expects ($ this ->once ())->method ('bind ' )->with ('delays ' , 'delay_messages__5000 ' );
506
+ $ connection ->publish ('{} ' , ['x-some-headers ' => 'foo ' ], 5000 );
507
+ }
521
508
522
- $ delayExchange ->expects ($ this ->once ())->method ('publish ' )->with ('{} ' , 'delay_messages__5000 ' , \AMQP_NOPARAM , ['headers ' => ['x-some-headers ' => 'foo ' ], 'delivery_mode ' => 2 , 'timestamp ' => time ()]);
509
+ public function testItRetriesTheMessage ()
510
+ {
511
+ $ delayExchange = $ this ->createMock (\AMQPExchange::class);
512
+ $ delayExchange ->expects ($ this ->once ())
513
+ ->method ('publish ' )
514
+ ->with ('{} ' , 'delay_messages__5000_retry ' , AMQP_NOPARAM );
515
+ $ connection = $ this ->createDelayOrRetryConnection ($ delayExchange , '' , 'delay_messages__5000_retry ' );
523
516
524
- $ connection = Connection::fromDsn ('amqp://localhost ' , [], $ factory );
525
- $ connection ->publish ('{} ' , ['x-some-headers ' => 'foo ' ], 5000 );
517
+ $ amqpEnvelope = $ this ->createMock (\AMQPEnvelope::class);
518
+ $ amqpStamp = AmqpStamp::createFromAmqpEnvelope ($ amqpEnvelope , null , '' );
519
+ $ connection ->publish ('{} ' , [], 5000 , $ amqpStamp );
526
520
}
527
521
528
522
public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs ()
@@ -550,7 +544,7 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
550
544
551
545
$ connection = Connection::fromDsn ('amqp://localhost ' , $ connectionOptions , $ factory );
552
546
553
- $ delayQueue ->expects ($ this ->once ())->method ('setName ' )->with ('delay_messages__120000 ' );
547
+ $ delayQueue ->expects ($ this ->once ())->method ('setName ' )->with ('delay_messages__120000_delay ' );
554
548
$ delayQueue ->expects ($ this ->once ())->method ('setArguments ' )->with ([
555
549
'x-message-ttl ' => 120000 ,
556
550
'x-expires ' => 120000 + 10000 ,
@@ -559,9 +553,9 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
559
553
]);
560
554
561
555
$ delayQueue ->expects ($ this ->once ())->method ('declareQueue ' );
562
- $ delayQueue ->expects ($ this ->once ())->method ('bind ' )->with ('delays ' , 'delay_messages__120000 ' );
556
+ $ delayQueue ->expects ($ this ->once ())->method ('bind ' )->with ('delays ' , 'delay_messages__120000_delay ' );
563
557
564
- $ delayExchange ->expects ($ this ->once ())->method ('publish ' )->with ('{} ' , 'delay_messages__120000 ' , \AMQP_NOPARAM , ['headers ' => [], 'delivery_mode ' => 2 , 'timestamp ' => time ()]);
558
+ $ delayExchange ->expects ($ this ->once ())->method ('publish ' )->with ('{} ' , 'delay_messages__120000_delay ' , \AMQP_NOPARAM , ['headers ' => [], 'delivery_mode ' => 2 , 'timestamp ' => time ()]);
565
559
$ connection ->publish ('{} ' , [], 120000 );
566
560
}
567
561
@@ -690,7 +684,7 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
690
684
691
685
$ connection = Connection::fromDsn ('amqp://localhost ' , $ connectionOptions , $ factory );
692
686
693
- $ delayQueue ->expects ($ this ->once ())->method ('setName ' )->with ('delay_messages_routing_key_120000 ' );
687
+ $ delayQueue ->expects ($ this ->once ())->method ('setName ' )->with ('delay_messages_routing_key_120000_delay ' );
694
688
$ delayQueue ->expects ($ this ->once ())->method ('setArguments ' )->with ([
695
689
'x-message-ttl ' => 120000 ,
696
690
'x-expires ' => 120000 + 10000 ,
@@ -699,9 +693,9 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
699
693
]);
700
694
701
695
$ delayQueue ->expects ($ this ->once ())->method ('declareQueue ' );
702
- $ delayQueue ->expects ($ this ->once ())->method ('bind ' )->with ('delays ' , 'delay_messages_routing_key_120000 ' );
696
+ $ delayQueue ->expects ($ this ->once ())->method ('bind ' )->with ('delays ' , 'delay_messages_routing_key_120000_delay ' );
703
697
704
- $ delayExchange ->expects ($ this ->once ())->method ('publish ' )->with ('{} ' , 'delay_messages_routing_key_120000 ' , \AMQP_NOPARAM , ['headers ' => [], 'delivery_mode ' => 2 , 'timestamp ' => time ()]);
698
+ $ delayExchange ->expects ($ this ->once ())->method ('publish ' )->with ('{} ' , 'delay_messages_routing_key_120000_delay ' , \AMQP_NOPARAM , ['headers ' => [], 'delivery_mode ' => 2 , 'timestamp ' => time ()]);
705
699
$ connection ->publish ('{} ' , [], 120000 , new AmqpStamp ('routing_key ' ));
706
700
}
707
701
@@ -769,6 +763,37 @@ public function testItCanPublishAndWaitForConfirmation()
769
763
$ connection = Connection::fromDsn ('amqp://localhost?confirm_timeout=0.5 ' , [], $ factory );
770
764
$ connection ->publish ('body ' );
771
765
}
766
+
767
+ private function createDelayOrRetryConnection (\AMQPExchange $ delayExchange , string $ deadLetterExchangeName , string $ delayQueueName ): Connection
768
+ {
769
+ $ amqpConnection = $ this ->createMock (\AMQPConnection::class);
770
+ $ amqpChannel = $ this ->createMock (\AMQPChannel::class);
771
+
772
+ $ factory = $ this ->createMock (AmqpFactory::class);
773
+ $ factory ->method ('createConnection ' )->willReturn ($ amqpConnection );
774
+ $ factory ->method ('createChannel ' )->willReturn ($ amqpChannel );
775
+ $ factory ->method ('createQueue ' )->will ($ this ->onConsecutiveCalls (
776
+ $ this ->createMock (\AMQPQueue::class),
777
+ $ delayQueue = $ this ->createMock (\AMQPQueue::class)
778
+ ));
779
+ $ factory ->method ('createExchange ' )->will ($ this ->onConsecutiveCalls (
780
+ $ this ->createMock (\AMQPExchange::class),
781
+ $ delayExchange
782
+ ));
783
+
784
+ $ delayQueue ->expects ($ this ->once ())->method ('setName ' )->with ($ delayQueueName );
785
+ $ delayQueue ->expects ($ this ->once ())->method ('setArguments ' )->with ([
786
+ 'x-message-ttl ' => 5000 ,
787
+ 'x-expires ' => 5000 + 10000 ,
788
+ 'x-dead-letter-exchange ' => $ deadLetterExchangeName ,
789
+ 'x-dead-letter-routing-key ' => '' ,
790
+ ]);
791
+
792
+ $ delayQueue ->expects ($ this ->once ())->method ('declareQueue ' );
793
+ $ delayQueue ->expec
10000
ts ($ this ->once ())->method ('bind ' )->with ('delays ' , $ delayQueueName );
794
+
795
+ return Connection::fromDsn ('amqp://localhost ' , [], $ factory );
796
+ }
772
797
}
773
798
774
799
class TestAmqpFactory extends AmqpFactory
0 commit comments