8000 blocking messenger consumer · symfony/symfony@f871f19 · GitHub
[go: up one dir, main page]

Skip to content

Commit f871f19

Browse files
committed
blocking messenger consumer
1 parent b1f78e6 commit f871f19

File tree

10 files changed

+19
-21
lines changed

10 files changed

+19
-21
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public function testItReturnsTheDecodedMessageToTheHandlerInBlockingMode()
6363

6464
$connection->method('getQueueNames')->willReturn(['queueName']);
6565
$connection->method('pull')->willReturnCallback(function (string $queueName, callable $callback) use ($amqpQueue, $amqpEnvelope) {
66-
call_user_func($callback, $amqpEnvelope, $amqpQueue);
66+
\call_user_func($callback, $amqpEnvelope, $amqpQueue);
6767
});
6868

6969
$receiver = new AmqpReceiver($connection, $serializer);

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpTransportTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public function testReceivesMessagesInBlockingMode()
7474
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
7575
$connection->method('getQueueNames')->willReturn(['queueName']);
7676
$connection->method('pull')->willReturnCallback(function (string $queueName, callable $callback) use ($amqpQueue, $amqpEnvelope) {
77-
call_user_func($callback, $amqpEnvelope, $amqpQueue);
77+
\call_user_func($callback, $amqpEnvelope, $amqpQueue);
7878
});
7979

8080
$transport->pull(function (Envelope $envelope) use ($decodedMessage) {

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public function pull(callable $callback): void
5555
*/
5656
public function pullFromQueues(array $queueNames, callable $callback): void
5757
{
58-
if (0 === count($queueNames)) {
58+
if (0 === \count($queueNames)) {
5959
return;
6060
}
6161

@@ -71,7 +71,7 @@ public function pullFromQueues(array $queueNames, callable $callback): void
7171

7272
private function pullEnvelope(string $queueName, ?callable $callback): void
7373
{
74-
if ($callback !== null) {
74+
if (null !== $callback) {
7575
$callback = function (\AMQPEnvelope $amqpEnvelope, \AMQPQueue $queue) use ($callback) {
7676
$queueName = $queue->getName();
7777
$body = $amqpEnvelope->getBody();

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -437,9 +437,9 @@ public function get(string $queueName): ?\AMQPEnvelope
437437
* Consume a message from the specified queue in blocking mode.
438438
*
439439
* @param ?callable(\AMQPEnvelope,\AMQPQueue):?false $callback If callback return false, then processing thread will be
440-
* returned from AMQPQueue::consume() to PHP script. If null is passed, then the messages delivered to this client
441-
* will be made available to the first real callback registered. That allows one to have a single callback
442-
* consuming from multiple queues.
440+
* returned from AMQPQueue::consume() to PHP script. If null is passed, then the messages delivered to this client
441+
* will be made available to the first real callback registered. That allows one to have a single callback
442+
* consuming from multiple queues.
443443
*
444444
* @throws \AMQPException
445445
*/

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
3434
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
3535
use Symfony\Component\Messenger\RoutableMessageBus;
36-
use Symfony\Component\Messenger\Transport\Receiver\BlockingReceiverInterface;
3736
use Symfony\Component\Messenger\Worker;
3837

3938
/**

src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public function testRunWithBlockingModeOption()
7979

8080
$receiver = $this->createMock(QueueBlockingReceiverInterface::class);
8181
$receiver->expects($this->once())->method('pullFromQueues')->willReturnCallback(function (array $queueNames, callable $callback) use ($envelope) {
82-
call_user_func($callback, $envelope);
82+
\call_user_func($callback, $envelope);
8383
});
8484

8585
$receiverLocator = $this->createMock(ContainerInterface::class);

src/Symfony/Component/Messenger/Tests/WorkerTest.php

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
2929
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
3030
use Symfony\Component\Messenger\Exception\RuntimeException;
31-
use Symfony\Component\Messenger\Exception\TransportException;
3231
use Symfony\Component\Messenger\Handler\Acknowledger;
3332
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
3433
use Symfony\Component\Messenger\Handler\BatchHandlerTrait;
@@ -694,7 +693,7 @@ public function dispatch(object $event): object
694693
$worker = new Worker(['transport' => $receiver], $bus, $dispatcher);
695694
$worker->run([
696695
'blocking-mode' => true,
697-
'queues' => ['foo']
696+
'queues' => ['foo'],
698697
]);
699698

700699
$this->assertSame($apiMessage, $envelopes[0]->getMessage());
@@ -782,7 +781,7 @@ public function pull(callable $callback): void
782781
foreach ($envelopes as $envelope) {
783782
$shouldContinue = $callback($envelope);
784783

785-
if ($shouldContinue === false) {
784+
if (false === $shouldContinue) {
786785
return;
787786
}
788787
}

src/Symfony/Component/Messenger/Transport/Receiver/BlockingReceiverInterface.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
interface BlockingReceiverInterface extends ReceiverInterface
2020
{
2121
/**
22-
* @param callable(\AMQPEnvelope):?false $callback If callback return false, then processing thread will be
23-
* returned to PHP script.
22+
* @param callable(\AMQPEnvelope):?false $callback if callback return false, then processing thread will be
23+
* returned to PHP script
2424
*
2525
* @throws TransportException If there is an issue communicating with the transport
2626
*/

src/Symfony/Component/Messenger/Transport/Receiver/QueueBlockingReceiverInterface.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ interface QueueBlockingReceiverInterface extends BlockingReceiverInterface
2121
/**
2222
* Pull messages from the specified queue names instead of consuming from all queues.
2323
*
24-
* @param string[] $queueNames
25-
* @param callable(\AMQPEnvelope):?false $callback If callback return false, then processing thread will be
26-
* returned to PHP script.
24+
* @param string[] $queueNames
25+
* @param callable(\AMQPEnvelope):?false $callback if callback return false, then processing thread will be
26+
* returned to PHP script
2727
*/
2828
public function pullFromQueues(array $queueNames, callable $callback): void;
2929
}

src/Symfony/Component/Messenger/Worker.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ public function run(array $options = []): void
8484
$this->metadata->set(['queueNames' => $queueNames]);
8585

8686
if ($blockingMode) {
87-
if (count($this->receivers) > 1) {
88-
throw new RuntimeException('In blocking mode only one receiver is supported');
87+
if (\count($this->receivers) > 1) {
88+
throw new RuntimeException('In blocking mode only one receiver is supported.');
8989
}
9090

9191
foreach ($this->receivers as $transportName => $receiver) {
@@ -130,10 +130,10 @@ public function run(array $options = []): void
130130
};
131131

132132
if ($queueNames) {
133-
/** @var QueueBlockingReceiverInterface $receiver */
133+
/* @var QueueBlockingReceiverInterface $receiver */
134134
$receiver->pullFromQueues($queueNames, $callback);
135135
} else {
136-
/** @var BlockingReceiverInterface $receiver */
136+
/* @var BlockingReceiverInterface $receiver */
137137
$receiver->pull($callback);
138138
}
139139
} else {

0 commit comments

Comments
 (0)
0