16
16
use Pheanstalk \Contract \PheanstalkSubscriberInterface ;
17
17
use Pheanstalk \Exception ;
18
18
use Pheanstalk \Exception \ClientException ;
19
+ use Pheanstalk \Exception \ConnectionException ;
19
20
use Pheanstalk \Exception \DeadlineSoonException ;
20
21
use Pheanstalk \Exception \ServerException ;
21
22
use Pheanstalk \Pheanstalk ;
@@ -131,6 +132,7 @@ public function testItThrowsAnExceptionIfAnExtraOptionIsDefinedInDSN()
131
132
public function testGet ()
132
133
{
133
134
$ id = '1234 ' ;
135
+ $ id2 = '1235 ' ;
134
136
$ beanstalkdEnvelope = [
135
137
'body ' => 'foo ' ,
136
138
'headers ' => 'bar ' ,
@@ -140,13 +142,52 @@ public function testGet()
140
142
$ timeout = 44 ;
141
143
142
144
$ tubeList = new TubeList ($ tubeName = new TubeName ($ tube ), $ tubeNameDefault = new TubeName ('default ' ));
143
- $ job = new Job (new JobId ($ id ), json_encode ($ beanstalkdEnvelope ));
144
145
145
146
$ client = $ this ->createMock (PheanstalkInterface::class);
146
147
$ client ->expects ($ this ->once ())->method ('watch ' )->with ($ tubeName )->willReturn (2 );
147
148
$ client ->expects ($ this ->once ())->method ('listTubesWatched ' )->willReturn ($ tubeList );
148
149
$ client ->expects ($ this ->once ())->method ('ignore ' )->with ($ tubeNameDefault )->willReturn (1 );
149
- $ client ->expects ($ this ->once ())->method ('reserveWithTimeout ' )->with ($ timeout )->willReturn ($ job );
150
+ $ client ->expects ($ this ->exactly (2 ))->method ('reserveWithTimeout ' )->with ($ timeout )->willReturnOnConsecutiveCalls (
151
+ new Job (new JobId ($ id ), json_encode ($ beanstalkdEnvelope )),
152
+ new Job (new JobId ($ id2 ), json_encode ($ beanstalkdEnvelope )),
153
+ );
154
+
155
+ $ connection = new Connection (['tube_name ' => $ tube , 'timeout ' => $ timeout ], $ client );
156
+
157
+ $ envelope = $ connection ->get ();
158
+
159
+ $ this ->assertSame ($ id , $ envelope ['id ' ]);
160
+ $ this ->assertSame ($ beanstalkdEnvelope ['body ' ], $ envelope ['body ' ]);
161
+ $ this ->assertSame ($ beanstalkdEnvelope ['headers ' ], $ envelope ['headers ' ]);
162
+
163
+ $ envelope = $ connection ->get ();
164
+
165
+ $ this ->assertSame ($ id2 , $ envelope ['id ' ]);
166
+ $ this ->assertSame ($ beanstalkdEnvelope ['body ' ], $ envelope ['body ' ]);
167
+ $ this ->assertSame ($ beanstalkdEnvelope ['headers ' ], $ envelope ['headers ' ]);
168
+ }
169
+
170
+ public function testGetOnReconnect ()
171
+ {
172
+ $ id = '1234 ' ;
173
+ $ beanstalkdEnvelope = [
174
+ 'body ' => 'foo ' ,
175
+ 'headers ' => 'bar ' ,
176
+ ];
177
+
178
+ $ tube = 'baz ' ;
179
+ $ timeout = 44 ;
180
+
181
+ $ tubeList = new TubeList ($ tubeName = new TubeName ($ tube ), $ tubeNameDefault = new TubeName ('default ' ));
182
+
183
+ $ client = $ this ->createMock (PheanstalkInterface::class);
184
+ $ client ->expects ($ this ->exactly (2 ))->method ('watch ' )->with ($ tubeName )->willReturn (2 );
185
+ $ client ->expects ($ this ->exactly (2 ))->method ('listTubesWatched ' )->willReturn ($ tubeList );
186
+ $ client ->expects ($ this ->exactly (2 ))->method ('ignore ' )->with ($ tubeNameDefault )->willReturn (1 );
187
+ $ client ->expects ($ this ->exactly (2 ))->method ('reserveWithTimeout ' )->with ($ timeout )->willReturnOnConsecutiveCalls (
188
+ $ this ->throwException (new ConnectionException ('123 ' , 'foobar ' )),
189
+ new Job (new JobId ($ id ), json_encode ($ beanstalkdEnvelope )),
190
+ );
150
191
151
192
$ connection = new Connection (['tube_name ' => $ tube , 'timeout ' => $ timeout ], $ client );
152
193
@@ -370,10 +411,11 @@ public function testSend()
370
411
$ expectedDelay = $ delay / 1000 ;
371
412
372
413
$ id = '110 ' ;
414
+ $ id2 = '111 ' ;
373
415
374
416
$ client = $ this ->createMock (PheanstalkInterface::class);
375
417
$ client ->expects ($ this ->once ())->method ('useTube ' )->with (new TubeName ($ tube ));
376
- $ client ->expects ($ this ->once ( ))->method ('put ' )->with (
418
+ $ client ->expects ($ this ->exactly ( 2 ))->method ('put ' )->with (
377
419
$ this ->callback (function (string $ data ) use ($ body , $ headers ): bool {
378
420
$ expectedMessage = json_encode ([
379
421
'body ' => $ body ,
@@ -385,7 +427,51 @@ public function testSend()
385
427
1024 ,
386
428
$ expectedDelay ,
387
429
90
388
- )->willReturn (new Job (new JobId ($ id ), 'foobar ' ));
430
+ )->willReturnOnConsecutiveCalls (
431
+ new Job (new JobId ($ id ), 'foobar ' ),
432
+ new Job (new JobId ($ id2 ), 'foobar ' ),
433
+ );
434
+
435
+ $ connection = new Connection (['tube_name ' => $ tube ], $ client );
436
+
437
+ $ returnedId = $ connection ->send ($ body , $ headers , $ delay );
438
+
439
+ $ this ->assertSame ($ id , $ returnedId );
440
+
441
+ $ returnedId = $ connection ->send ($ body , $ headers , $ delay );
442
+
443
+ $ this ->assertSame ($ id2 , $ returnedId );
444
+ }
445
+
446
+ public function testSendOnReconnect ()
447
+ {
448
+ $ tube = 'xyz ' ;
449
+
450
+ $ body = 'foo ' ;
451
+ $ headers = ['test ' => 'bar ' ];
452
+ $ delay = 1000 ;
453
+ $ expectedDelay = $ delay / 1000 ;
454
+
455
+ $ id = '110 ' ;
456
+
457
+ $ client = $ this ->createMock (PheanstalkInterface::class);
458
+ $ client ->expects ($ this ->exactly (2 ))->method ('useTube ' )->with (new TubeName ($ tube ));
459
+ $ client ->expects ($ this ->exactly (2 ))->method ('put ' )->with (
460
+ $ this ->callback (function (string $ data ) use ($ body , $ headers ): bool {
461
+ $ expectedMessage = json_encode ([
462
+ 'body ' => $ body ,
463
+ 'headers ' => $ headers ,
464
+ ]);
465
+
466
+ return $ expectedMessage === $ data ;
467
+ }),
468
+ 1024 ,
469
+ $ expectedDelay ,
470
+ 90
471
+ )->willReturnOnConsecutiveCalls (
472
+ $ this ->throwException (new ConnectionException ('123 ' , 'foobar ' )),
473
+ new Job (new JobId ($ id ), 'foobar ' ),
474
+ );
389
475
390
476
$ connection = new Connection (['tube_name ' => $ tube ], $ client );
391
477
@@ -520,4 +606,5 @@ public function testSendWithRoundedDelay()
520
606
521
607
interface PheanstalkInterface extends PheanstalkPublisherInterface, PheanstalkSubscriberInterface, PheanstalkManagerInterface
522
608
{
609
+ public function disconnect (): void ;
523
610
}
0 commit comments