8000 blocking messenger consumer · symfony/symfony@cb7d6e9 · GitHub
[go: up one dir, main page]

Skip to content

Commit cb7d6e9

Browse files
committed
blocking messenger consumer
1 parent 4f0368c commit cb7d6e9

File tree

10 files changed

+19
-21
lines changed

10 files changed

+19
-21
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public function testItReturnsTheDecodedMessageToTheHandlerInBlockingMode()
6363

6464
$connection->method('getQueueNames')->willReturn(['queueName']);
6565
$connection->method('pull')->willReturnCallback(function (string $queueName, callable $callback) use ($amqpQueue, $amqpEnvelope) {
66-
call_user_func($callback, $amqpEnvelope, $amqpQueue);
66+
\call_user_func($callback, $amqpEnvelope, $amqpQueue);
6767
});
6868

6969
$receiver = new AmqpReceiver($connection, $serializer);

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpTransportTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public function testReceivesMessagesInBlockingMode()
7474
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
7575
$connection->method('getQueueNames')->willReturn(['queueName']);
7676
$connection->method('pull')->willReturnCallback(function (string $queueName, callable $callback) use ($amqpQueue, $amqpEnvelope) {
77-
call_user_func($callback, $amqpEnvelope, $amqpQueue);
77+
\call_user_func($callback, $amqpEnvelope, $amqpQueue);
7878
});
7979

8080
$transport->pull(function (Envelope $envelope) use ($decodedMessage) {

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public function pull(callable $callback): void
5858
*/
5959
public function pullFromQueues(array $queueNames, callable $callback): void
6060
{
61-
if (0 === count($queueNames)) {
61+
if (0 === \count($queueNames)) {
6262
return;
6363
}
6464

@@ -74,7 +74,7 @@ public function pullFromQueues(array $queueNames, callable $callback): void
7474

7575
private function pullEnvelope(string $queueName, ?callable $callback): void
7676
{
77-
if ($callback !== null) {
77+
if (null !== $callback) {
7878
$callback = function (\AMQPEnvelope $amqpEnvelope, \AMQPQueue $queue) use ($callback) {
7979
$queueName = $queue->getName();
8080
$body = $amqpEnvelope->getBody();

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -432,9 +432,9 @@ public function get(string $queueName): ?\AMQPEnvelope
432432
* Consume a message from the specified queue in blocking mode.
433433
*
434434
* @param ?callable(\AMQPEnvelope,\AMQPQueue):?false $callback If callback return false, then processing thread will be
435-
* returned from AMQPQueue::consume() to PHP script. If null is passed, then the messages delivered to this client
436-
* will be made available to the first real callback registered. That allows one to have a single callback
437-
* consuming from multiple queues.
435+
* returned from AMQPQueue::consume() to PHP script. If null is passed, then the messages delivered to this client
436+
* will be made available to the first real callback registered. That allows one to have a single callback
437+
* consuming from multiple queues.
438438
*
439439
* @throws \AMQPException
440440
*/

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
3333
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
3434
use Symfony\Component\Messenger\RoutableMessageBus;
35-
use Symfony\Component\Messenger\Transport\Receiver\BlockingReceiverInterface;
3635
use Symfony\Component\Messenger\Worker;
3736

3837
/**

src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public function testRunWithBlockingModeOption()
7878

7979
$receiver = $this->createMock(QueueBlockingReceiverInterface::class);
8080
$receiver->expects($this->once())->method('pullFromQueues')->willReturnCallback(function (array $queueNames, callable $callback) use ($envelope) {
81-
call_user_func($callback, $envelope);
81+
\call_user_func($callback, $envelope);
8282
});
8383

8484
$receiverLocator = $this->createMock(ContainerInterface::class);

src/Symfony/Component/Messenger/Tests/WorkerTest.php

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
2727
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
2828
use Symfony\Component\Messenger\Exception\RuntimeException;
29-
use Symfony\Component\Messenger\Exception\TransportException;
3029
use Symfony\Component\Messenger\Handler\Acknowledger;
3130
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
3231
use Symfony\Component\Messenger\Handler\BatchHandlerTrait;
@@ -653,7 +652,7 @@ public function dispatch(object $event): object
653652
$worker = new Worker(['transport' => $receiver], $bus, $dispatcher);
654653
$worker->run([
655654
'blocking-mode' => true,
656-
'queues' => ['foo']
655+
'queues' => ['foo'],
657656
]);
658657

659658
$this->assertSame($apiMessage, $envelopes[0]->getMessage());
@@ -741,7 +740,7 @@ public function pull(callable $callback): void
741740
foreach ($envelopes as $envelope) {
742741
$shouldContinue = $callback($envelope);
743742

744-
if ($shouldContinue === false) {
743+
if (false === $shouldContinue) {
745744
return;
746745
}
747746
}

src/Symfony/Component/Messenger/Transport/Receiver/BlockingReceiverInterface.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
interface BlockingReceiverInterface extends ReceiverInterface
2020
{
2121
/**
22-
* @param callable(\AMQPEnvelope):?false $callback If callback return false, then processing thread will be
23-
* returned to PHP script.
22+
* @param callable(\AMQPEnvelope):?false $callback if callback return false, then processing thread will be
23+
* returned to PHP script
2424
*
2525
* @throws TransportException If there is an issue communicating with the transport
2626
*/

src/Symfony/Component/Messenger/Transport/Receiver/QueueBlockingReceiverInterface.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ interface QueueBlockingReceiverInterface extends BlockingReceiverInterface
2121
/**
2222
* Pull messages from the specified queue names instead of consuming from all queues.
2323
*
24-
* @param string[] $queueNames
25-
* @param callable(\AMQPEnvelope):?false $callback If callback return false, then processing thread will be
26-
* returned to PHP script.
24+
* @param string[] $queueNames
25+
* @param callable(\AMQPEnvelope):?false $callback if callback return false, then processing thread will be
26+
* returned to PHP script
2727
*/
2828
public function pullFromQueues(array $queueNames, callable $callback): void;
2929
}

src/Symfony/Component/Messenger/Worker.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ public function run(array $options = []): void
8282
$this->metadata->set(['queueNames' => $queueNames]);
8383

8484
if ($blockingMode) {
85-
if (count($this->receivers) > 1) {
86-
throw new RuntimeException('In blocking mode only one receiver is supported');
85+
if (\count($this->receivers) > 1) {
86+
throw new RuntimeException('In blocking mode only one receiver is supported.');
8787
}
8888

8989
foreach ($this->receivers as $transportName => $receiver) {
@@ -127,10 +127,10 @@ public function run(array $options = []): void
127127
};
128128

129129
if ($queueNames) {
130-
/** @var QueueBlockingReceiverInterface $receiver */
130+
/* @var QueueBlockingReceiverInterface $receiver */
131131
$receiver->pullFromQueues($queueNames, $callback);
132132
} else {
133-
/** @var BlockingReceiverInterface $receiver */
133+
/* @var BlockingReceiverInterface $receiver */
134134
$receiver->pull($callback);
135135
}
136136
} else {

0 commit comments

Comments
 (0)
0