8000 limit consumer to specific queues with command argument · symfony/symfony@c1d54a8 · GitHub
[go: up one dir, main page]

Skip to content

Commit c1d54a8

Browse files
committed
limit consumer to specific queues with command argument
1 parent fb51ddc commit c1d54a8

File tree

4 files changed

+30
-35
lines changed

4 files changed

+30
-35
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
1717
use Symfony\Component\Messenger\Exception\TransportException;
1818
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
19-
use Symfony\Component\Messenger\Transport\Receiver\QueueAwareInterface;
19+
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
2020
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
2121
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2222
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -26,16 +26,11 @@
2626
*
2727
* @author Samuel Roze <samuel.roze@gmail.com>
2828
*/
29-
class AmqpReceiver implements ReceiverInterface, QueueAwareInterface, MessageCountAwareInterface
29+
class AmqpReceiver implements ReceiverInterface, QueueReceiverInterface, MessageCountAwareInterface
3030
{
3131
private $serializer;
3232
private $connection;
3333

34-
/**
35-
* @var string[]
36-
*/
37-
private $queueNames;
38-
3934
public function __construct(Connection $connection, SerializerInterface $serializer = null)
4035
{
4136
$this->connection = $connection;
@@ -47,18 +42,14 @@ public function __construct(Connection $connection, SerializerInterface $seriali
4742
*/
4843
public function get(): iterable
4944
{
50-
if (!$this->queueNames) {
51-
$this->queueNames = $this->connection->getQueueNames();
52-
}
53-
54-
foreach ($this->queueNames as $queueName) {
55-
yield from $this->getEnvelope($queueName);
56-
}
45+
yield from $this->getFromQueues($this->connection->getQueueNames());
5746
}
5847

59-
public function setQueueNames(array $queueNames): void
48+
public function getFromQueues(array $queueNames): iterable
6049
{
61-
$this->queueNames = $queueNames;
50+
foreach ($queueNames as $queueName) {
51+
yield from $this->getEnvelope($queueName);
52+
}
6253
}
6354

6455
private function getEnvelope(string $queueName): iterable

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
2929
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
3030
use Symfony\Component\Messenger\RoutableMessageBus;
31-
use Symfony\Component\Messenger\Transport\Receiver\QueueAwareInterface;
31+
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
3232
use Symfony\Component\Messenger\Worker;
3333

3434
/**
@@ -71,7 +71,7 @@ protected function configure(): void
7171
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can handle new messages'),
7272
new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
7373
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically)'),
74-
new InputOption('queue', null, InputOption::VALUE_REQUIRED|InputOption::VALUE_IS_ARRAY, 'To limit consuming to a only some queues (for receivers that support it)'),
74+
new InputOption('queues', null, InputOption::VALUE_REQUIRED|InputOption::VALUE_IS_ARRAY, 'To limit consuming to a only some queues (for receivers that support it)'),
7575
])
7676
->setDescription('Consumes messages')
7777
->setHelp(<<<'EOF'
@@ -198,20 +198,16 @@ protected function execute(InputInterface $input, OutputInterface $output)
198198
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
199199
}
200200

201-
if ($queues = $input->getOption('queue')) {
202-
foreach ($receivers as $receiver) {
203-
if (!$receiver instanceof QueueAwareInterface) {
204-
throw new \RuntimeException();
205-
}
206-
$receiver->setQueues($queues);
207-
}
208-
}
209201
$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;
210202

211203
$worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger);
212-
$worker->run([
204+
$options = [
213205
'sleep' => $input->getOption('sleep') * 1000000,
214-
]);
206+
];
207+
if ($queues = $input->getOption('queues')) {
208+
$options['queues'] = $queues;
209+
}
210+
$worker->run($options);
215211

216212
return 0;
217213
}

src/Symfony/Component/Messenger/Transport/Receiver/QueueAwareInterface.php renamed to src/Symfony/Component/Messenger/Transport/Receiver/QueueReceiverInterface.php

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,15 @@
1111

1212
namespace Symfony\Component\Messenger\Transport\Receiver;
1313

14-
use Symfony\Component\Messenger\Envelope;
15-
use Symfony\Component\Messenger\Exception\TransportException;
16-
1714
/**
1815
* @author David Buchmann <mail@davidbu.ch>
1916
*/
20-
interface QueueAwareInterface
17+
interface QueueReceiverInterface
2118
{
2219
/**
23-
* Limit this receiver to the specified queue names instead of consuming from all queues.
20+
* Get messages from the specified queue names instead of consuming from all queues.
2421
*
2522
* @param string[] $queueNames
2623
*/
27-
public function setQueueNames(array $queueNames): void;
24+
public function getFromQueues(array $queueNames): iterable;
2825
}

src/Symfony/Component/Messenger/Worker.php

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
2525
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
2626
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
27+
use Symfony\Component\Messenger\Transport\Receiver\QueueAwareInterface;
28+
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
2729
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
2830
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
2931

@@ -57,6 +59,7 @@ public function __construct(array $receivers, MessageBusInterface $bus, EventDis
5759
*
5860
* Valid options are:
5961
* * sleep (default: 1000000): Time in microseconds to sleep after no messages are found
62+
* * queues: The queue names to consume from, instead of consuming from all queues. All receivers must implement QueueReceiverInterface
6063
*/
6164
public function run(array $options = []): void
6265
{
@@ -65,11 +68,19 @@ public function run(array $options = []): void
6568
$options = array_merge([
6669
'sleep' => 1000000,
6770
], $options);
71+
$queueNames = $options['queues'] ?? false;
6872

6973
while (false === $this->shouldStop) {
7074
$envelopeHandled = false;
7175
foreach ($this->receivers as $transportName => $receiver) {
72-
$envelopes = $receiver->get();
76+
if ($queueNames) {
77+
if (!$receiver instanceof QueueReceiverInterface) {
78+
throw new \RuntimeException("Receiver $transportName does not implement ".QueueReceiverInterface::class);
79+
}
80+
$envelopes = $receiver->getFromQueues($queueNames);
81+
} else {
82+
$envelopes = $receiver->get();
83+
}
7384

7485
foreach ($envelopes as $envelope) {
7586
$envelopeHandled = true;

0 commit comments

Comments
 (0)
0