@@ -774,6 +774,73 @@ public function testItCanBeConstructedWithTLSOptionsAndNonTLSDsn()
774
774
);
775
775
}
776
776
777
+ public function testItCanRetryPublishWhenAMQPConnectionExceptionIsThrown ()
778
+ {
779
+ $ factory = new TestAmqpFactory (
780
+ $ amqpConnection = $ this ->createMock (\AMQPConnection::class),
781
+ $ amqpChannel = $ this ->createMock (\AMQPChannel::class),
782
+ $ amqpQueue = $ this ->createMock (\AMQPQueue::class),
783
+ $ amqpExchange = $ this ->createMock (\AMQPExchange::class)
784
+ );
785
+
786
+ $ amqpExchange ->expects ($ this ->exactly (2 ))
787
+ ->method ('publish ' )
788
+ ->willReturnOnConsecutiveCalls (
789
+ $ this ->throwException (new \AMQPConnectionException ('a socket error occurred ' )),
790
+ null
791
+ );
792
+
793
+ $ connection = Connection::fromDsn ('amqp://localhost ' , [], $ factory );
794
+ $ connection ->publish ('body ' );
795
+ }
796
+
797
+ public function testItCanRetryPublishWithDelayWhenAMQPConnectionExceptionIsThrown ()
798
+ {
799
+ $ factory = new TestAmqpFactory (
800
+ $ amqpConnection = $ this ->createMock (\AMQPConnection::class),
801
+ $ amqpChannel = $ this ->createMock (\AMQPChannel::class),
802
+ $ amqpQueue = $ this ->createMock (\AMQPQueue::class),
803
+ $ amqpExchange = $ this ->createMock (\AMQPExchange::class)
804
+ );
805
+
806
+ $ amqpExchange ->expects ($ this ->exactly (2 ))
807
+ ->method ('publish ' )
808
+ ->willReturnOnConsecutiveCalls (
809
+ $ this ->throwException (new \AMQPConnectionException ('a socket error occurred ' )),
810
+ null
811
+ );
812
+
813
+ $ connection = Connection::fromDsn ('amqp://localhost ' , [], $ factory );
814
+ $ connection ->publish ('body ' , [], 5000 );
815
+ }
816
+
817
+ public function testItWillRetryMaxThreeTimesWhenAMQPConnectionExceptionIsThrown ()
818
+ {
819
+ $ factory = new TestAmqpFactory (
820
+ $ amqpConnection = $ this ->createMock (\AMQPConnection::class),
821
+ $ amqpChannel = $ this ->createMock (\AMQPChannel::class),
822
+ $ amqpQueue = $ this ->createMock (\AMQPQueue::class),
823
+ $ amqpExchange = $ this ->createMock (\AMQPExchange::class)
824
+ );
825
+
826
+ $ exception = new \AMQPConnectionException ('a socket error occurred ' );
827
+
828
+ $ amqpExchange ->expects ($ this ->exactly (4 ))
829
+ ->method ('publish ' )
830
+ ->willReturnOnConsecutiveCalls (
831
+ $ this ->throwException ($ exception ),
832
+ $ this ->throwException ($ exception ),
833
+ $ this ->throwException ($ exception ),
834
+ $ this ->throwException ($ exception ),
835
+ );
836
+
837
+ self ::expectException ($ exception ::class);
838
+ self ::expectExceptionMessage ($ exception ->getMessage ());
839
+
840
+ $ connection = Connection::fromDsn ('amqp://localhost ' , [], $ factory );
841
+ $ connection ->publish ('body ' );
842
+ }
843
+
777
844
private function createDelayOrRetryConnection (\AMQPExchange $ delayExchange , string $ deadLetterExchangeName , string $ delayQueueName ): Connection
778
845
{
779
846
$ amqpConnection = $ this ->createMock (\AMQPConnection::class);
0 commit comments