10000 [amqp] set signal socket helper in context's constructor. · enumag/enqueue-dev@2520548 · GitHub
[go: up one dir, main page]

Skip to content

Commit 2520548

Browse files
committed
[amqp] set signal socket helper in context's constructor.
Make it easy to write test
1 parent 107ea8c commit 2520548

File tree

2 files changed

+18
-8
lines changed

2 files changed

+18
-8
lines changed

pkg/amqp-bunny/AmqpContext.php

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware
5555
*/
5656
private $subscribers;
5757

58+
/**
59+
* @var SignalSocketHelper
60+
*/
61+
private $signalSocketHandler;
62+
5863
/**
5964
* Callable must return instance of \Bunny\Channel once called.
6065
*
@@ -80,6 +85,7 @@ public function __construct($bunnyChannel, $config = [])
8085

8186
$this->buffer = new Buffer();
8287
$this->subscribers = [];
88+
$this->signalSocketHandler = new SignalSocketHelper();
8389
}
8490

8591
/**
@@ -390,19 +396,18 @@ public function consume($timeout = 0)
390396
throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming');
391397
}
392398

393-
$socketHelper = new SignalSocketHelper();
394-
$socketHelper->beforeSocket();
399+
$this->signalSocketHandler->beforeSocket();
395400

396401
try {
397402
$this->getBunnyChannel()->getClient()->run(0 !== $timeout ? $timeout / 1000 : null);
398403
} catch (ClientException $e) {
399-
if ('stream_select() failed.' == $e->getMessage() && $socketHelper->wasThereSignal()) {
404+
if ('stream_select() failed.' == $e->getMessage() && $this->signalSocketHandler->wasThereSignal()) {
400405
return;
401406
}
402407

403408
throw $e;
404409
} finally {
405-
$socketHelper->afterSocket();
410+
$this->signalSocketHandler->afterSocket();
406411
}
407412
}
408413

pkg/amqp-lib/AmqpContext.php

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware
5757
*/
5858
private $subscribers;
5959

60+
/**
61+
* @var SignalSocketHelper
62+
*/
63+
private $signalSocketHandler;
64+
6065
/**
6166
* @param AbstractConnection $connection
6267
* @param array $config
@@ -73,6 +78,7 @@ public function __construct(AbstractConnection $connection, $config = [])
7378
$this->connection = $connection;
7479
$this->buffer = new Buffer();
7580
$this->subscribers = [];
81+
$this->signalSocketHandler = new SignalSocketHelper();
7682
}
7783

7884
/**
@@ -384,8 +390,7 @@ public function consume($timeout = 0)
384390
throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming');
385391
}
386392

387-
$socketHelper = new SignalSocketHelper();
388-
$socketHelper->beforeSocket();
393+
$this->signalSocketHandler->beforeSocket();
389394

390395
try {
391396
while (true) {
@@ -408,13 +413,13 @@ public function consume($timeout = 0)
408413
} catch (AMQPTimeoutException $e) {
409414
} catch (StopBasicConsumptionException $e) {
410415
} catch (AMQPIOWaitException $e) {
411-
if ($socketHelper->wasThereSignal()) {
416+
if ($this->signalSocketHandler->wasThereSignal()) {
412417
return;
413418
}
414419

415420
throw $e;
416421
} finally {
417-
$socketHelper->afterSocket();
422+
$this->signalSocketHandler->afterSocket();
418423
}
419424
}
420425

0 commit comments

Comments
 (0)
0