8000 Adding the ability to consume multiple transports in one Worker · symfony/symfony@8866804 · GitHub
[go: up one dir, main page]

Skip to content

Commit 8866804

Browse files
committed
Adding the ability to consume multiple transports in one Worker
1 parent bf89cd6 commit 8866804

15 files changed

+189
-102
lines changed

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ CHANGELOG
2323
* The default command name for `ConsumeMessagesCommand` was
2424
changed from `messenger:consume-messages` to `messenger:consume`
2525
* `ConsumeMessagesCommand` has two new optional constructor arguments
26-
* `Worker` has 4 new option constructor arguments.
26+
* [BC BREAK] The first argument to Worker changed from a single
27+
`ReceiverInterface` to an array of `ReceiverInterface`.
28+
* `Worker` has 3 new optional constructor arguments.
2729
* The `Worker` class now handles calling `pcntl_signal_dispatch()` the
2830
receiver no longer needs to call this.
2931
* The `AmqpSender` will now retry messages using a dead-letter exchange

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use Symfony\Component\Console\Input\InputInterface;
2020
use Symfony\Component\Console\Input\InputOption;
2121
use Symfony\Component\Console\Output\OutputInterface;
22+
use Symfony\Component\Console\Question\ChoiceQuestion;
2223
use Symfony\Component\Console\Style\SymfonyStyle;
2324
use Symfony\Component\Messenger\RoutableMessageBus;
2425
use Symfony\Component\Messenger\Worker;
@@ -70,7 +71,7 @@ protected function configure(): void
7071

7172
$this
7273
->setDefinition([
73-
new InputArgument('receiver', $defaultReceiverName ? InputArgument::OPTIONAL : InputArgument::REQUIRED, 'Name of the receiver', $defaultReceiverName),
74+
new InputArgument('receivers', InputArgument::IS_ARRAY, 'Names of the receivers/transports to consume in order of priority', $defaultReceiverName ? [$defaultReceiverName] : []),
7475
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
7576
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
7677
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
@@ -83,6 +84,10 @@ protected function configure(): void
8384
8485
<info>php %command.full_name% <receiver-name></info>
8586
87+
To receive from multiple transports, pass each name:
88+
89+
<info>php %command.full_name% receiver1 receiver2</info>
90+
8691
Use the --limit option to limit the number of messages received:
8792
8893
<info>php %command.full_name% <receiver-name> --limit=10</info>
@@ -112,16 +117,22 @@ protected function interact(InputInterface $input, OutputInterface $output)
112117
{
113118
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
114119

115-
if ($this->receiverNames && !$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
116-
if (null === $receiverName) {
117-
$io->block('Missing receiver argument.', null, 'error', ' ', true);
118-
$input->setArgument('receiver', $io->choice('Select one of the available receivers', $this->receiverNames));
119-
} elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) {
120-
$io->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true);
121-
if ($io->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) {
122-
$input->setArgument('receiver', $alternatives[0]);
123-
}
120+
if ($this->receiverNames && 0 === \count($input->getArgument('receivers'))) {
121+
$io->block('Which transports/receivers do you want to consume?', null, 'fg=white;bg=blue', ' ', true);
122+
123+
$io->writeln('Choose which receivers you want to consume messages from in order of priority.');
124+
if (\count($this->receiverNames) > 1) {
125+
$io->writeln(sprintf('Hint: to consume from multiple, use a list of their names, e.g. <comment>%s</comment>', implode(', ', $this->receiverNames)));
124126
}
127+
128+
$question = new ChoiceQuestion('Select receivers to consume:', $this->receiverNames, 0);
129+
$question->setMultiselect(true);
130+
131+
$input->setArgument('receivers', $io->askQuestion($question));
132+
}
133+
134+
if (0 === \count($input->getArgument('receivers'))) {
135+
throw new RuntimeException('Please pass at least one receiver.');
125136
}
126137
}
127138

@@ -136,24 +147,33 @@ protected function execute(InputInterface $input, OutputInterface $output): void
136147
$output->writeln(sprintf('<comment>%s</comment>', $message));
137148
}
138149

139-
if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
140-
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
141-
}
150+
$receivers = [];
151+
$retryStrategies = [];
152+
foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) {
153+
if (!$this->receiverLocator->has($receiverName)) {
154+
$message = sprintf('The receiver "%s" does not exist.', $receiverName);
155+
if ($this->receiverNames) {
156+
$message .= sprintf(' Valid receivers are: %s.', implode(', ', $this->receiverNames));
157+
}
142158

143-
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
144-
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
145-
}
159+
throw new RuntimeException($message);
160+
}
146161

147-
$receiver = $this->receiverLocator->get($receiverName);
148-
$retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
162+
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
163+
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
164+
}
165+
166+
$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
167+
$retryStrategies[$receiverName] = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
168+
}
149169

150170
if (null !== $input->getOption('bus')) {
151171
$bus = $this->busLocator->get($input->getOption('bus'));
152172
} else {
153173
$bus = new RoutableMessageBus($this->busLocator);
154174
}
155175

156-
$worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger);
176+
$worker = new Worker($receivers, $bus, $retryStrategies, $this->eventDispatcher, $this->logger);
157177
$stopsWhen = [];
158178
if ($limit = $input->getOption('limit')) {
159179
$stopsWhen[] = "processed {$limit} messages";
@@ -171,7 +191,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void
171191
}
172192

173193
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
174-
$io->success(sprintf('Consuming messages from transport "%s".', $receiverName));
194+
$io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames)));
175195

176196
if ($stopsWhen) {
177197
$last = array_pop($stopsWhen);
@@ -186,7 +206,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void
186206
}
187207

188208
$worker->run([
189-
'sleep' => $input->getOption('sleep') * 1000000
209+
'sleep' => $input->getOption('sleep') * 1000000,
190210
]);
191211
}
192212

src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,8 @@ class ConsumeMessagesCommandTest extends TestCase
2020
public function testConfigurationWithDefaultReceiver()
2121
{
2222
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp']);
23-
$inputArgument = $command->getDefinition()->getArgument('receiver');
23+
$inputArgument = $command->getDefinition()->getArgument('receivers');
2424
$this->assertFalse($inputArgument->isRequired());
25-
$this->assertSame('amqp', $inputArgument->getDefault());
26-
}
27-
28-
public function testConfigurationWithoutDefaultReceiver()
29-
{
30-
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp', 'dummy']);
31-
$inputArgument = $command->getDefinition()->getArgument('receiver');
32-
$this->assertTrue($inputArgument->isRequired());
33-
$this->assertNull($inputArgument->getDefault());
25+
$this->assertSame(['amqp'], $inputArgument->getDefault());
3426
}
3527
}

src/Symfony/Component/Messenger/Tests/Fixtures/DummyWorker.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public function __construct(array $envelopesToReceive)
1515
$this->envelopesToReceive = $envelopesToReceive;
1616
}
1717

18-
public function run(callable $onHandled 93C6 Callback = null): void
18+
public function run(array $options = [], callable $onHandledCallback = null): void
1919
{
2020
foreach ($this->envelopesToReceive as $envelope) {
2121
if (true === $this->isStopped) {

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,20 @@ public function testItSendsAndReceivesMessages()
5757
$sender->send($first = new Envelope(new DummyMessage('First')));
5858
$sender->send($second = new Envelope(new DummyMessage('Second')));
5959

60-
$envelope = $receiver->get();
60+
$envelopes = iterator_to_array($receiver->get());
61+
$this->assertCount(1, $envelopes);
62+
/** @var Envelope $envelope */
63+
$envelope = $envelopes[0];
6164
$this->assertEquals($first->getMessage(), $envelope->getMessage());
6265
$this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));
6366

64-
$envelope = $receiver->get();
65-
$this->assertEquals($envelope->getMessage(), $envelope->getMessage());
67+
$envelopes = iterator_to_array($receiver->get());
68+
$this->assertCount(1, $envelopes);
69+
/** @var Envelope $envelope */
70+
$envelope = $envelopes[0];
71+
$this->assertEquals($second->getMessage(), $envelope->getMessage());
6672

67-
$this->assertNull($receiver->get());
73+
$this->assertEmpty(iterator_to_array($receiver->get()));
6874
}
6975

7076
public function testRetryAndDelay()
@@ -80,20 +86,26 @@ public function testRetryAndDelay()
8086

8187
$sender->send($first = new Envelope(new DummyMessage('First')));
8288

83-
$envelope = $receiver->get();
89+
$envelopes = iterator_to_array($receiver->get());
90+
/** @var Envelope $envelope */
91+
$envelope = $envelopes[0];
8492
$newEnvelope = $envelope
8593
->with(new DelayStamp(2000))
8694
->with(new RedeliveryStamp(1, 'not_important'));
8795
$sender->send($newEnvelope);
8896
$receiver->ack($envelope);
8997

90-
$envelope = null;
98+
$envelopes = [];
9199
$startTime = time();
92100
// wait for next message, but only for max 3 seconds
93-
while (null === $envelope && $startTime + 3 > time()) {
94-
$envelope = $receiver->get();
101+
while (0 === \count($envelopes) && $startTime + 3 > time()) {
102+
$envelopes = iterator_to_array($receiver->get());
95103
}
96104

105+
$this->assertCount(1, $envelopes);
106+
/** @var Envelope $envelope */
107+
$envelope = $envelopes[0];
108+
97109
// should have a 2 second delay
98110
$this->assertGreaterThanOrEqual($startTime + 2, time());
99111
// but only a 2 second delay

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ public function testItReturnsTheDecodedMessageToTheHandler()
3939
$connection->method('get')->willReturn($amqpEnvelope);
4040

4141
$receiver = new AmqpReceiver($connection, $serializer);
42-
$actualEnvelope = $receiver->get();
43-
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage());
42+
$actualEnvelopes = iterator_to_array($receiver->get());
43+
$this->assertCount(1, $actualEnvelopes);
44+
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
4445
}
4546

4647
/**

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportTest.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ public function testReceivesMessages()
4747
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
4848
$connection->method('get')->willReturn($amqpEnvelope);
4949

50-
$envelope = $transport->get();
51-
$this->assertSame($decodedMessage, $envelope->getMessage());
50+
$envelopes = iterator_to_array($transport->get());
51+
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
5252
}
5353

5454
private function getTransport(SerializerInterface $serializer = null, Connection $connection = null)

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Fixtures/long_receiver.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
$receiver = new AmqpReceiver($connection, $serializer);
3333
$retryStrategy = new MultiplierRetryStrategy(3, 0);
3434

35-
$worker = new Worker($receiver, new class() implements MessageBusInterface {
35+
$worker = new Worker(['the_receiver' => $receiver], new class() implements MessageBusInterface {
3636
public function dispatch($envelope): Envelope
3737
{
3838
echo 'Get envelope with message: '.\get_class($envelope->getMessage())."\n";
@@ -43,7 +43,7 @@ public function dispatch($envelope): Envelope
4343

4444
return $envelope;
4545
}
46-
}, 'the_receiver', $retryStrategy);
46+
});
4747

4848
echo "Receiving messages...\n";
4949
$worker->run();

src/Symfony/Component/Messenger/Tests/Worker/StopWhenMemoryUsageIsExceededWorkerTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public function testWorkerStopsWhenMemoryLimitExceeded(int $memoryUsage, int $me
3737
};
3838

3939
$memoryLimitWorker = new StopWhenMemoryUsageIsExceededWorker($decoratedWorker, $memoryLimit, null, $memoryResolver);
40-
$memoryLimitWorker->run($handledCallback);
40+
$memoryLimitWorker->run([], $handledCallback);
4141

4242
// handler should be called exactly 1 time
4343
$this->assertSame($handlerCalledTimes, 1);

src/Symfony/Component/Messenger/Tests/Worker/StopWhenMessageCountIsExceededWorkerTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public function testWorkerStopsWhenMaximumCountExceeded($max, $shouldStop)
3939
]);
4040

4141
$maximumCountWorker = new StopWhenMessageCountIsExceededWorker($decoratedWorker, $max);
42-
$maximumCountWorker->run($handledCallback);
42+
$maximumCountWorker->run([], $handledCallback);
4343

4444
$this->assertSame($shouldStop, $decoratedWorker->isStopped());
4545
}

src/Symfony/Component/Messenger/Tests/Worker/StopWhenTimeLimitIsReachedWorkerTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public function testWorkerStopsWhenTimeLimitIsReached()
3434
->with('Worker stopped due to time limit of {timeLimit}s reac 5EC9 hed', ['timeLimit' => 1]);
3535

3636
$timeoutWorker = new StopWhenTimeLimitIsReachedWorker($decoratedWorker, 1, $logger);
37-
$timeoutWorker->run(function () {
37+
$timeoutWorker->run([], function () {
3838
sleep(2);
3939
});
4040

0 commit comments

Comments
 (0)
0