8000 [Messenger] Blocking consumer by melihovv · Pull Request #47226 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[Messenger] Blocking consumer #47226

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
blocking messenger consumer
  • Loading branch information
melihovv committed Jul 4, 2023
commit f871f19b9897718085d3f9845ce48f9719e18bac
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public function testItReturnsTheDecodedMessageToTheHandlerInBlockingMode()

$connection->method('getQueueNames')->willReturn(['queueName']);
$connection->method('pull')->willReturnCallback(function (string $queueName, callable $callback) use ($amqpQueue, $amqpEnvelope) {
call_user_func($callback, $amqpEnvelope, $amqpQueue);
\call_user_func($callback, $amqpEnvelope, $amqpQueue);
});

$receiver = new AmqpReceiver($connection, $serializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public function testReceivesMessagesInBlockingMode()
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
$connection->method('getQueueNames')->willReturn(['queueName']);
$connection->method('pull')->willReturnCallback(function (string $queueName, callable $callback) use ($amqpQueue, $amqpEnvelope) {
call_user_func($callback, $amqpEnvelope, $amqpQueue);
\call_user_func($callback, $amqpEnvelope, $amqpQueue);
});

$transport->pull(function (Envelope $envelope) use ($decodedMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public function pull(callable $callback): void
*/
public function pullFromQueues(array $queueNames, callable $callback): void
{
if (0 === count($queueNames)) {
if (0 === \count($queueNames)) {
return;
}

Expand All @@ -71,7 +71,7 @@ public function pullFromQueues(array $queueNames, callable $callback): void

private function pullEnvelope(string $queueName, ?callable $callback): void
{
if ($callback !== null) {
if (null !== $callback) {
$callback = function (\AMQPEnvelope $amqpEnvelope, \AMQPQueue $queue) use ($callback) {
$queueName = $queue->getName();
$body = $amqpEnvelope->getBody();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,9 @@ public function get(string $queueName): ?\AMQPEnvelope
* Consume a message from the specified queue in blocking mode.
*
* @param ?callable(\AMQPEnvelope,\AMQPQueue):?false $callback If callback return false, then processing thread will be
* returned from AMQPQueue::consume() to PHP script. If null is passed, then the messages delivered to this client
* will be made available to the first real callback registered. That allows one to have a single callback
* consuming from multiple queues.
* returned from AMQPQueue::consume() to PHP script. If null is passed, then the messages delivered to this client
* will be made available to the first real callback registered. That allows one to have a single callback
* consuming from multiple queues.
*
* @throws \AMQPException
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Transport\Receiver\BlockingReceiverInterface;
use Symfony\Component\Messenger\Worker;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public function testRunWithBlockingModeOption()

$receiver = $this->createMock(QueueBlockingReceiverInterface::class);
$receiver->expects($this->once())->method('pullFromQueues')->willReturnCallback(function (array $queueNames, callable $callback) use ($envelope) {
call_user_func($callback, $envelope);
\call_user_func($callback, $envelope);
});

$receiverLocator = $this->createMock(ContainerInterface::class);
Expand Down
5 changes: 2 additions & 3 deletions src/Symfony/Component/Messenger/Tests/WorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\Exception\RuntimeException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Handler\Acknowledger;
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
use Symfony\Component\Messenger\Handler\BatchHandlerTrait;
Expand Down Expand Up @@ -694,7 +693,7 @@ public function dispatch(object $event): object
$worker = new Worker(['transport' => $receiver], $bus, $dispatcher);
$worker->run([
'blocking-mode' => true,
'queues' => ['foo']
'queues' => ['foo'],
]);

$this->assertSame($apiMessage, $envelopes[0]->getMessage());
Expand Down Expand Up @@ -782,7 +781,7 @@ public function pull(callable $callback): void
foreach ($envelopes as $envelope) {
$shouldContinue = $callback($envelope);

23D3 if ($shouldContinue === false) {
if (false === $shouldContinue) {
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
interface BlockingReceiverInterface extends ReceiverInterface
{
/**
* @param callable(\AMQPEnvelope):?false $callback If callback return false, then processing thread will be
* returned to PHP script.
* @param callable(\AMQPEnvelope):?false $callback if callback return false, then processing thread will be
* returned to PHP script
*
* @throws TransportException If there is an issue communicating with the transport
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ interface QueueBlockingReceiverInterface extends BlockingReceiverInterface
/**
* Pull messages from the specified queue names instead of consuming from all queues.
*
* @param string[] $queueNames
* @param callable(\AMQPEnvelope):?false $callback If callback return false, then processing thread will be
* returned to PHP script.
* @param string[] $queueNames
* @param callable(\AMQPEnvelope):?false $callback if callback return false, then processing thread will be
* returned to PHP script
*/
public function pullFromQueues(array $queueNames, callable $callback): void;
}
8 changes: 4 additions & 4 deletions src/Symfony/Component/Messenger/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ public function run(array $options = []): void
$this->metadata->set(['queueNames' => $queueNames]);

if ($blockingMode) {
if (count($this->receivers) > 1) {
throw new RuntimeException('In blocking mode only one receiver is supported');
if (\count($this->receivers) > 1) {
throw new RuntimeException('In blocking mode only one receiver is supported.');
}

foreach ($this->receivers as $transportName => $receiver) {
Expand Down Expand Up @@ -130,10 +130,10 @@ public function run(array $options = []): void
};

if ($queueNames) {
/** @var QueueBlockingReceiverInterface $receiver */
/* @var QueueBlockingReceiverInterface $receiver */
$receiver->pullFromQueues($queueNames, $callback);
} else {
/** @var BlockingReceiverInterface $receiver */
/* @var BlockingReceiverInterface $receiver */
$receiver->pull($callback);
}
} else {
Expand Down
0