1616use Pheanstalk \Contract \PheanstalkSubscriberInterface ;
1717use Pheanstalk \Exception ;
1818use Pheanstalk \Exception \ClientException ;
19+ use Pheanstalk \Exception \ConnectionException ;
1920use Pheanstalk \Exception \DeadlineSoonException ;
2021use Pheanstalk \Exception \ServerException ;
2122use Pheanstalk \Pheanstalk ;
@@ -131,6 +132,7 @@ public function testItThrowsAnExceptionIfAnExtraOptionIsDefinedInDSN()
131132 public function testGet ()
132133 {
133134 $ id = '1234 ' ;
135+ $ id2 = '1235 ' ;
134136 $ beanstalkdEnvelope = [
135137 'body ' => 'foo ' ,
136138 'headers ' => 'bar ' ,
@@ -140,13 +142,52 @@ public function testGet()
140142 $ timeout = 44 ;
141143
142144 $ tubeList = new TubeList ($ tubeName = new TubeName ($ tube ), $ tubeNameDefault = new TubeName ('default ' ));
143- $ job = new Job (new JobId ($ id ), json_encode ($ beanstalkdEnvelope ));
144145
145146 $ client = $ this ->createMock (PheanstalkInterface::class);
146147 $ client ->expects ($ this ->once ())->method ('watch ' )->with ($ tubeName )->willReturn (2 );
147148 $ client ->expects ($ this ->once ())->method ('listTubesWatched ' )->willReturn ($ tubeList );
148149 $ client ->expects ($ this ->once ())->method ('ignore ' )->with ($ tubeNameDefault )->willReturn (1 );
149- $ client ->expects ($ this ->once ())->method ('reserveWithTimeout ' )->with ($ timeout )->willReturn ($ job );
150+ $ client ->expects ($ this ->exactly (2 ))->method ('reserveWithTimeout ' )->with ($ timeout )->willReturnOnConsecutiveCalls (
151+ new Job (new JobId ($ id ), json_encode ($ beanstalkdEnvelope )),
152+ new Job (new JobId ($ id2 ), json_encode ($ beanstalkdEnvelope )),
153+ );
154+
155+ $ connection = new Connection (['tube_name ' => $ tube , 'timeout ' => $ timeout ], $ client );
156+
157+ $ envelope = $ connection ->get ();
158+
159+ $ this ->assertSame ($ id , $ envelope ['id ' ]);
160+ $ this ->assertSame ($ beanstalkdEnvelope ['body ' ], $ envelope ['body ' ]);
161+ $ this ->assertSame ($ beanstalkdEnvelope ['headers ' ], $ envelope ['headers ' ]);
162+
163+ $ envelope = $ connection ->get ();
164+
165+ $ this ->assertSame ($ id2 , $ envelope ['id ' ]);
166+ $ this ->assertSame ($ beanstalkdEnvelope ['body ' ], $ envelope ['body ' ]);
167+ $ this ->assertSame ($ beanstalkdEnvelope ['headers ' ], $ envelope ['headers ' ]);
168+ }
169+
170+ public function testGetOnReconnect ()
171+ {
172+ $ id = '1234 ' ;
173+ $ beanstalkdEnvelope = [
174+ 'body ' => 'foo ' ,
175+ 'headers ' => 'bar ' ,
176+ ];
177+
178+ $ tube = 'baz ' ;
179+ $ timeout = 44 ;
180+
181+ $ tubeList = new TubeList ($ tubeName = new TubeName ($ tube ), $ tubeNameDefault = new TubeName ('default ' ));
182+
183+ $ client = $ this ->createMock (PheanstalkInterface::class);
184+ $ client ->expects ($ this ->exactly (2 ))->method ('watch ' )->with ($ tubeName )->willReturn (2 );
185+ $ client ->expects ($ this ->exactly (2 ))->method ('listTubesWatched ' )->willReturn ($ tubeList );
186+ $ client ->expects ($ this ->exactly (2 ))->method ('ignore ' )->with ($ tubeNameDefault )->willReturn (1 );
187+ $ client ->expects ($ this ->exactly (2 ))->method ('reserveWithTimeout ' )->with ($ timeout )->willReturnOnConsecutiveCalls (
188+ $ this ->throwException (new ConnectionException ('123 ' , 'foobar ' )),
189+ new Job (new JobId ($ id ), json_encode ($ beanstalkdEnvelope )),
190+ );
150191
151192 $ connection = new Connection (['tube_name ' => $ tube , 'timeout ' => $ timeout ], $ client );
152193
@@ -370,10 +411,11 @@ public function testSend()
370411 $ expectedDelay = $ delay / 1000 ;
371412
372413 $ id = '110 ' ;
414+ $ id2 = '111 ' ;
373415
374416 $ client = $ this ->createMock (PheanstalkInterface::class);
375417 $ client ->expects ($ this ->once ())->method ('useTube ' )->with (new TubeName ($ tube ));
376- $ client ->expects ($ this ->once ( ))->method ('put ' )->with (
418+ $ client ->expects ($ this ->exactly ( 2 ))->method ('put ' )->with (
377419 $ this ->callback (function (string $ data ) use ($ body , $ headers ): bool {
378420 $ expectedMessage = json_encode ([
379421 'body ' => $ body ,
@@ -385,7 +427,51 @@ public function testSend()
385427 1024 ,
386428 $ expectedDelay ,
387429 90
388- )->willReturn (new Job (new JobId ($ id ), 'foobar ' ));
430+ )->willReturnOnConsecutiveCalls (
431+ new Job (new JobId ($ id ), 'foobar ' ),
432+ new Job (new JobId ($ id2 ), 'foobar ' ),
433+ );
434+
435+ $ connection = new Connection (['tube_name ' => $ tube ], $ client );
436+
437+ $ returnedId = $ connection ->send ($ body , $ headers , $ delay );
438+
439+ $ this ->assertSame ($ id , $ returnedId );
440+
441+ $ returnedId = $ connection ->send ($ body , $ headers , $ delay );
442+
443+ $ this ->assertSame ($ id2 , $ returnedId );
444+ }
445+
446+ public function testSendOnReconnect ()
447+ {
448+ $ tube = 'xyz ' ;
449+
450+ $ body = 'foo ' ;
451+ $ headers = ['test ' => 'bar ' ];
452+ $ delay = 1000 ;
453+ $ expectedDelay = $ delay / 1000 ;
454+
455+ $ id = '110 ' ;
456+
457+ $ client = $ this ->createMock (PheanstalkInterface::class);
458+ $ client ->expects ($ this ->exactly (2 ))->method ('useTube ' )->with (new TubeName ($ tube ));
459+ $ client ->expects ($ this ->exactly (2 ))->method ('put ' )->with (
460+ $ this ->callback (function (string $ data ) use ($ body , $ headers ): bool {
461+ $ expectedMessage = json_encode ([
462+ 'body ' => $ body ,
463+ 'headers ' => $ headers ,
464+ ]);
465+
466+ return $ expectedMessage === $ data ;
467+ }),
468+ 1024 ,
469+ $ expectedDelay ,
470+ 90
471+ )->willReturnOnConsecutiveCalls (
472+ $ this ->throwException (new ConnectionException ('123 ' , 'foobar ' )),
473+ new Job (new JobId ($ id ), 'foobar ' ),
474+ );
389475
390476 $ connection = new Connection (['tube_name ' => $ tube ], $ client );
391477
@@ -520,4 +606,5 @@ public function testSendWithRoundedDelay()
520606
521607interface PheanstalkInterface extends PheanstalkPublisherInterface, PheanstalkSubscriberInterface, PheanstalkManagerInterface
522608{
609+ public function disconnect (): void ;
523610}
0 commit comments