8000 [Messenger] Use newer version of Beanstalkd bridge library · symfony/symfony@de86b5c · GitHub
[go: up one dir, main page]

Skip to content
< 8000 div hidden="hidden" data-view-component="true" class="js-stale-session-flash stale-session-flash flash flash-warn flash-full"> Dismiss alert

Commit de86b5c

Browse files
HypeMCfabpot
authored andcommitted
[Messenger] Use newer version of Beanstalkd bridge library
1 parent f803dc1 commit de86b5c

File tree

4 files changed

+149
-108
lines changed

4 files changed

+149
-108
lines changed

composer.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@
144144
"monolog/monolog": "^3.0",
145145
"nikic/php-parser": "^4.18|^5.0",
146146
"nyholm/psr7": "^1.0",
147-
"pda/pheanstalk": "^4.0",
147+
"pda/pheanstalk": "^5.1|^7.0",
148148
"php-http/discovery": "^1.15",
149149
"php-http/httplug": "^1.0|^2.0",
150150
"phpdocumentor/reflection-docblock": "^5.2",

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php

+70-52
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,21 @@
1111

1212
namespace Symfony\Component\Messenger\Bridge\Beanstalkd\Tests\Transport;
1313

14-
use Pheanstalk\Contract\PheanstalkInterface;
14+
use Pheanstalk\Contract\PheanstalkManagerInterface;
15+
use Pheanstalk\Contract\PheanstalkPublisherInterface;
16+
use Pheanstalk\Contract\PheanstalkSubscriberInterface;
1517
use Pheanstalk\Exception;
1618
use Pheanstalk\Exception\ClientException;
1719
use Pheanstalk\Exception\DeadlineSoonException;
1820
use Pheanstalk\Exception\ServerException;
19-
use Pheanstalk\Job;
20-
use Pheanstalk\JobId;
2121
use Pheanstalk\Pheanstalk;
22-
use Pheanstalk\Response\ArrayResponse;
22+
use Pheanstalk\Values\Job;
23+
use Pheanstalk\Values\JobId;
24+
use Pheanstalk\Values\JobState;
25+
use Pheanstalk\Values\JobStats;
26+
use Pheanstalk\Values\TubeList;
27+
use Pheanstalk\Values\TubeName;
28+
use Pheanstalk\Values\TubeStats;
2329
use PHPUnit\Framework\TestCase;
2430
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
2531
use Symfony\Component\Messenger\Exception\InvalidArgumentException as MessengerInvalidArgumentException;
@@ -124,7 +130,7 @@ public function testItThrowsAnExceptionIfAnExtraOptionIsDefinedInDSN()
124130

125131
public function testGet()
126132
{
127-
$id = 1234;
133+
$id = '1234';
128134
$beanstalkdEnvelope = [
129135
'body' => 'foo',
130136
'headers' => 'bar',
@@ -133,17 +139,20 @@ public function testGet()
133139
$tube = 'baz';
134140
$timeout = 44;
135141

136-
$job = new Job($id, json_encode($beanstalkdEnvelope));
142+
$tubeList = new TubeList($tubeName = new TubeName($tube), $tubeNameDefault = new TubeName('default'));
143+
$job = new Job(new JobId($id), json_encode($beanstalkdEnvelope));
137144

138145
$client = $this->createMock(PheanstalkInterface::class);
139-
$client->expects($this->once())->method('watchOnly')->with($tube)->willReturn($client);
146+
$client->expects($this->once())->method('watch')->with($tubeName)->willReturn(2);
147+
$client->expects($this->once())->method('listTubesWatched')->willReturn($tubeList);
148+
$client->expects($this->once())->method('ignore')->with($tubeNameDefault)->willReturn(1);
140149
$client->expects($this->once())->method('reserveWithTimeout')->with($timeout)->willReturn($job);
141150

142151
$connection = new Connection(['tube_name' => $tube, 'timeout' => $timeout], $client);
143152

144153
$envelope = $connection->get();
145154

146-
$this->assertSame((string) $id, $envelope['id']);
155+
$this->assertSame($id, $envelope['id']);
147156
$this->assertSame($beanstalkdEnvelope['body'], $envelope['body']);
148157
$this->assertSame($beanstalkdEnvelope['headers'], $envelope['headers']);
149158
}
@@ -154,7 +163,9 @@ public function testGetWhenThereIsNoJobInTheTube()
154163
$timeout = 44;
155164

156165
$client = $this->createMock(PheanstalkInterface::class);
157-
$client->expects($this->once())->method('watchOnly')->with($tube)->willReturn($client);
166+
$client->expects($this->once())->method('watch')->with(new TubeName($tube))->willReturn(1);
167+
$client->expects($this->never())->method('listTubesWatched');
168+
$client->expects($this->never())->method('ignore');
158169
$client->expects($this->once())->method('reserveWithTimeout')->with($timeout)->willReturn(null);
159170

160171
$connection = new Connection(['tube_name' => $tube, 'timeout' => $timeout], $client);
@@ -170,7 +181,9 @@ public function testGetWhenABeanstalkdExceptionOccurs()
170181
$exception = new DeadlineSoonException('foo error');
171182

172183
$client = $this->createMock(PheanstalkInterface::class);
173-
$client->expects($this->once())->method('watchOnly')->with($tube)->willReturn($client);
184+
$client->expects($this->once())->method('watch')->with(new TubeName($tube))->willReturn(1);
185+
$client->expects($this->never())->method('listTubesWatched');
186+
$client->expects($this->never())->method('ignore');
174187
$client->expects($this->once())->method('reserveWithTimeout')->with($timeout)->willThrowException($exception);
175188

176189
$connection = new Connection(['tube_name' => $tube, 'timeout' => $timeout], $client);
@@ -181,35 +194,35 @@ public function testGetWhenABeanstalkdExceptionOccurs()
181194

182195
public function testAck()
183196
{
184-
$id = 123456;
197+
$id = '123456';
185198

186199
$tube = 'xyz';
187200

188201
$client = $this->createMock(PheanstalkInterface::class);
189-
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
202+
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
190203
$client->expects($this->once())->method('delete')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id));
191204

192205
$connection = new Connection(['tube_name' => $tube], $client);
193206

194-
$connection->ack((string) $id);
207+
$connection->ack($id);
195208
}
196209

197210
public function testAckWhenABeanstalkdExceptionOccurs()
198211
{
199-
$id = 123456;
212+
$id = '123456';
200213

201214
$tube = 'xyzw';
202215

203216
$exception = new ServerException('baz error');
204217

205218
$client = $this->createMock(PheanstalkInterface::class);
206-
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
219+
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
207220
$client->expects($this->once())->method('delete')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willThrowException($exception);
208221

209222
$connection = new Connection(['tube_name' => $tube], $client);
210223

211224
$this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception));
212-
$connection->ack((string) $id);
225+
$connection->ack($id);
213226
}
214227

215228
/**
@@ -219,66 +232,66 @@ public function testAckWhenABeanstalkdExceptionOccurs()
219232
*/
220233
public function testReject(bool $buryOnReject, bool $forceDelete)
221234
{
222-
$id = 123456;
235+
$id = '123456';
223236

224237
$tube = 'baz';
225238

226239
$client = $this->createMock(PheanstalkInterface::class);
227-
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
240+
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
228241
$client->expects($this->once())->method('delete')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id));
229242

230243
$connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => $buryOnReject], $client);
231244

232-
$connection->reject((string) $id, null, $forceDelete);
245+
$connection->reject($id, null, $forceDelete);
233246
}
234247

235248
public function testRejectWithBury()
236249
{
237-
$id = 123456;
250+
$id = '123456';
238251

239252
$tube = 'baz';
240253

241254
$client = $this->createMock(PheanstalkInterface::class);
242-
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
255+
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
243256
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id), 1024);
244257

245258
$connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => true], $client);
246259

247-
$connection->reject((string) $id);
260+
$connection->reject($id);
248261
}
249262

250263
public function testRejectWithBuryAndPriority()
251264
{
252-
$id = 123456;
265+
$id = '123456';
253266
$priority = 2;
254267

255268
$tube = 'baz';
256269

257270
$client = $this->createMock(PheanstalkInterface::class);
258-
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
271+
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
259272
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id), $priority);
260273

261274
$connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => true], $client);
262275

263-
$connection->reject((string) $id, $priority);
276+
$connection->reject($id, $priority);
264277
}
265278

266279
public function testRejectWhenABeanstalkdExceptionOccurs()
267280
{
268-
$id = 123456;
281+
$id = '123456';
269282

270283
$tube = 'baz123';
271284

272285
$exception = new ServerException('baz error');
273286

274287
$client = $this->createMock(PheanstalkInterface::class);
275-
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
288+
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
276289
$client->expects($this->once())->method('delete')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willThrowException($exception);
277290

278291
$connection = new Connection(['tube_name' => $tube], $client);
279292

280293
$this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception));
281-
$connection->reject((string) $id);
294+
$connection->reject($id);
282295
}
283296

284297
public function testMessageCount()
@@ -287,10 +300,11 @@ public function testMessageCount()
287300

288301
$count = 51;
289302

290-
$response = new ArrayResponse('OK', ['current-jobs-ready' => $count]);
303+
$response = new TubeStats($tubeName = new TubeName($tube), 0, 51, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
291304

292305
$client = $this->createMock(PheanstalkInterface::class);
293-
$client->expects($this->once())->method('statsTube')->with($tube)->willReturn($response);
306+
$client->expects($this->once())->method('useTube')->with($tubeName);
307+
$client->expects($this->once())->method('statsTube')->with($tubeName)->willReturn($response);
294308

295309
$connection = new Connection(['tube_name' => $tube], $client);
296310

@@ -304,7 +318,7 @@ public function testMessageCountWhenABeanstalkdExceptionOccurs()
304318
$exception = new ClientException('foobar error');
305319

306320
$client = $this->createMock(PheanstalkInterface::class);
307-
$client->expects($this->once())->method('statsTube')->with($tube)->willThrowException($exception);
321+
$client->expects($this->once())->method('statsTube')->with(new TubeName($tube))->willThrowException($exception);
308322

309323
$connection = new Connection(['tube_name' => $tube], $client);
310324

@@ -314,24 +328,24 @@ public function testMessageCountWhenABeanstalkdExceptionOccurs()
314328

315329
public function testMessagePriority()
316330
{
317-
$id = 123456;
331+
$id = '123456';
318332
$priority = 51;
319333

320334
$tube = 'baz';
321335

322-
$response = new ArrayResponse('OK', ['pri' => $priority]);
336+
$response = new JobStats(new JobId($id), new TubeName($tube), JobState::READY, $priority, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
323337

324338
$client = $this->createMock(PheanstalkInterface::class);
325339
$client->expects($this->once())->method('statsJob')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willReturn($response);
326340

327341
$connection = new Connection(['tube_name' => $tube], $client);
328342

329-
$this->assertSame($priority, $connection->getMessagePriority((string) $id));
343+
$this->assertSame($priority, $connection->getMessagePriority($id));
330344
}
331345

332346
public function testMessagePriorityWhenABeanstalkdExceptionOccurs()
333347
{
334-
$id = 123456;
348+
$id = '123456';
335349

336350
$tube = 'baz1234';
337351

@@ -343,7 +357,7 @@ public function testMessagePriorityWhenABeanstalkdExceptionOccurs()
343357
$connection = new Connection(['tube_name' => $tube], $client);
344358

345359
$this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception));
346-
$connection->getMessagePriority((string) $id);
360+
$connection->getMessagePriority($id);
347361
}
348362

349363
public function testSend()
@@ -355,10 +369,10 @@ public function testSend()
355369
$delay = 1000;
356370
$expectedDelay = $delay / 1000;
357371

358-
$id = 110;
372+
$id = '110';
359373

360374
$client = $this->createMock(PheanstalkInterface::class);
361-
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
375+
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
362376
$client->expects($this->once())->method('put')->with(
363377
$this->callback(function (string $data) use ($body, $headers): bool {
364378
$expectedMessage = json_encode([
@@ -371,13 +385,13 @@ public function testSend()
371385
1024,
372386
$expectedDelay,
373387
90
374-
)->willReturn(new Job($id, 'foobar'));
388+
)->willReturn(new Job(new JobId($id), 'foobar'));
375389

376390
$connection = new Connection(['tube_name' => $tube], $client);
377391

378392
$returnedId = $connection->send($body, $headers, $delay);
379393

380-
$this->assertSame($id, (int) $returnedId);
394+
$this->assertSame($id, $returnedId);
381395
}
382396

383397
public function testSendWithPriority()
@@ -390,10 +404,10 @@ public function testSendWithPriority()
390404
$priority = 2;
391405
$expectedDelay = $delay / 1000;
392406

393-
$id = 110;
407+
$id = '110';
394408

395409
$client = $this->createMock(PheanstalkInterface::class);
396-
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
410+
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
397411
$client->expects($this->once())->method('put')->with(
398412
$this->callback(function (string $data) use ($body, $headers): bool {
399413
$expectedMessage = json_encode([
@@ -406,13 +420,13 @@ public function testSendWithPriority()
406420
$priority,
407421
$expectedDelay,
408422
90
409-
)->willReturn(new Job($id, 'foobar'));
423+
)->willReturn(new Job(new JobId($id), 'foobar'));
410424

411425
$connection = new Connection(['tube_name' => $tube], $client);
412426

413427
$returnedId = $connection->send($body, $headers, $delay, $priority);
414428

415-
$this->assertSame($id, (int) $returnedId);
429+
$this->assertSame($id, $returnedId);
416430
}
417431

418432
public function testSendWhenABeanstalkdExceptionOccurs()
@@ -427,7 +441,7 @@ public function testSendWhenABeanstalkdExceptionOccurs()
427441
$exception = new Exception('foo bar');
428442

429443
$client = $this->createMock(PheanstalkInterface::class);
430-
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
444+
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
431445
$client->expects($this->once())->method('put')->with(
432446
$this->callback(function (string $data) use ($body, $headers): bool {
433447
$expectedMessage = json_encode([
@@ -451,35 +465,35 @@ public function testSendWhenABeanstalkdExceptionOccurs()
451465

452466
public function testKeepalive()
453467
{
454-
$id = 123456;
468+
$id = '123456';
455469

456470
$tube = 'baz';
457471

458472
$client = $this->createMock(PheanstalkInterface::class);
459-
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
473+
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
460474
$client->expects($this->once())->method('touch')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id));
461475

462476
$connection = new Connection(['tube_name' => $tube], $client);
463477

464-
$connection->keepalive((string) $id);
478+
$connection->keepalive($id);
465479
}
466480

467481
public function testKeepaliveWhenABeanstalkdExceptionOccurs()
468482
{
469-
$id = 123456;
483+
$id = '123456';
470484

471485
$tube = 'baz123';
472486

473487
$exception = new ServerException('baz error');
474488

475489
$client = $this->createMock(PheanstalkInterface::class);
476-
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
490+
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
477491
$client->expects($this->once())->method('touch')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willThrowException($exception);
478492

479493
$connection = new Connection(['tube_name' => $tube], $client);
480494

481495
$this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception));
482-
$connection->keepalive((string) $id);
496+
$connection->keepalive($id);
483497
}
484498

485499
public function testSendWithRoundedDelay()
@@ -491,7 +505,7 @@ public function testSendWithRoundedDelay()
491505
$expectedDelay = 0;
492506

493507
$client = $this->createMock(PheanstalkInterface::class);
494-
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
508+
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
495509
$client->expects($this->once())->method('put')->with(
496510
$this->anything(),
497511
$this->anything(),
@@ -503,3 +517,7 @@ public function testSendWithRoundedDelay()
503517
$connection->send($body, $headers, $delay);
504518
}
505519
}
520+
521+
interface PheanstalkInterface extends PheanstalkPublisherInterface, PheanstalkSubscriberInterface, PheanstalkManagerInterface
522+
{
523+
}

0 commit comments

Comments
 (0)
0