8000 feature #30708 [Messenger] ReceiverInterface::handle() to get() & Wor… · symfony/symfony@b12351a · GitHub
[go: up one dir, main page]

Skip to content

Commit b12351a

Browse files
committed
feature #30708 [Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports (weaverryan)
This PR was squashed before being merged into the 4.3-dev branch (closes #30708). Discussion ---------- [Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | Helps with #30699 | License | MIT | Doc PR | TODO Highlights: * `messenger:consume` can now consume messages from multiple transports with priority ❗️ ``` bin/console messenger:consume amqp_high amqp_medium amqp_low ``` * How long you want to sleep before checking more messages is now an option to `messenger:consume` * `ReceiverInterface::receive()` is replaced with `ReceiverInterface::get()` * Logic for looping & sleeping is moved into `Worker` Commits ------- e800bd5 [Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports
2 parents 2389d7c + e800bd5 commit b12351a

28 files changed

+917
-804
lines changed

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ CHANGELOG
1111
to the `Envelope` then find the correct bus when receiving from
1212
the transport. See `ConsumeMessagesCommand`.
1313
* The optional `$busNames` constructor argument of the class `ConsumeMessagesCommand` was removed.
14-
* [BC BREAK] 2 new methods were added to `ReceiverInterface`:
15-
`ack()` and `reject()`.
14+
* [BC BREAK] 3 new methods were added to `ReceiverInterface`:
15+
`ack()`, `reject()` and `get()`. The methods `receive()`
16+
and `stop()` were removed.
1617
* [BC BREAK] Error handling was moved from the receivers into
1718
`Worker`. Implementations of `ReceiverInterface::handle()`
1819
should now allow all exceptions to be thrown, except for transport
@@ -24,7 +25,9 @@ CHANGELOG
2425
* The default command name for `ConsumeMessagesCommand` was
2526
changed from `messenger:consume-messages` to `messenger:consume`
2627
* `ConsumeMessagesCommand` has two new optional constructor arguments
27-
* `Worker` has 4 new option constructor arguments.
28+
* [BC BREAK] The first argument to Worker changed from a single
29+
`ReceiverInterface` to an array of `ReceiverInterface`.
30+
* `Worker` has 3 new optional constructor arguments.
2831
* The `Worker` class now handles calling `pcntl_signal_dispatch()` the
2932
receiver no longer needs to call this.
3033
* The `AmqpSender` will now retry messages using a dead-letter exchange

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 50 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@
1919
use Symfony\Component\Console\Input\InputInterface;
2020
use Symfony\Component\Console\Input\InputOption;
2121
use Symfony\Component\Console\Output\OutputInterface;
22+
use Symfony\Component\Console\Question\ChoiceQuestion;
2223
use Symfony\Component\Console\Style\SymfonyStyle;
2324
use Symfony\Component\Messenger\RoutableMessageBus;
24-
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
25-
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
26-
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
2725
use Symfony\Component\Messenger\Worker;
26+
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
27+
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
28+
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
2829
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
2930

3031
/**
@@ -70,10 +71,11 @@ protected function configure(): void
7071

7172
$this
7273
->setDefinition([
73-
new InputArgument('receiver', $defaultReceiverName ? InputArgument::OPTIONAL : InputArgument::REQUIRED, 'Name of the receiver', $defaultReceiverName),
74+
new InputArgument('receivers', InputArgument::IS_ARRAY, 'Names of the receivers/transports to consume in order of priority', $defaultReceiverName ? [$defaultReceiverName] : []),
7475
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
7576
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
7677
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
78+
new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
7779
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically.'),
7880
])
7981
->setDescription('Consumes messages')
@@ -82,6 +84,10 @@ protected function configure(): void
8284
8385
<info>php %command.full_name% <receiver-name></info>
8486
87+
To receive from multiple transports, pass each name:
88+
89+
<info>php %command.full_name% receiver1 receiver2</info>
90+
8591
Use the --limit option to limit the number of messages received:
8692
8793
<info>php %command.full_name% <receiver-name> --limit=10</info>
@@ -111,16 +117,22 @@ protected function interact(InputInterface $input, OutputInterface $output)
111117
{
112118
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
113119

114-
if ($this->receiverNames && !$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
115-
if (null === $receiverName) {
116-
$io->block('Missing receiver argument.', null, 'error', ' ', true);
117-
$input->setArgument('receiver', $io->choice('Select one of the available receivers', $this->receiverNames));
118-
} elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) {
119-
$io->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true);
120-
if ($io->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) {
121-
$input->setArgument('receiver', $alternatives[0]);
122-
}
120+
if ($this->receiverNames && 0 === \count($input->getArgument('receivers'))) {
121+
$io->block('Which transports/receivers do you want to consume?', null, 'fg=white;bg=blue', ' ', true);
122+
123+
$io->writeln('Choose which receivers you want to consume messages from in order of priority.');
124+
if (\count($this->receiverNames) > 1) {
125+
$io->writeln(sprintf('Hint: to consume from multiple, use a list of their names, e.g. <comment>%s</comment>', implode(', ', $this->receiverNames)));
123126
}
127+
128+
$question = new ChoiceQuestion('Select receivers to consume:', $this->receiverNames, 0);
129+
$question->setMultiselect(true);
130+
131+
$input->setArgument('receivers', $io->askQuestion($question));
132+
}
133+
134+
if (0 === \count($input->getArgument('receivers'))) {
135+
throw new RuntimeException('Please pass at least one receiver.');
124136
}
125137
}
126138

@@ -135,41 +147,51 @@ protected function execute(InputInterface $input, OutputInterface $output): void
135147
$output->writeln(sprintf('<comment>%s</comment>', $message));
136148
}
137149

138-
if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
139-
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
140-
}
150+
$receivers = [];
151+
$retryStrategies = [];
152+
foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) {
153+
if (!$this->receiverLocator->has($receiverName)) {
154+
$message = sprintf('The receiver "%s" does not exist.', $receiverName);
155+
if ($this->receiverNames) {
156+
$message .= sprintf(' Valid receivers are: %s.', implode(', ', $this->receiverNames));
157+
}
141158

142-
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
143-
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
144-
}
159+
throw new RuntimeException($message);
160+
}
145161

146-
$receiver = $this->receiverLocator->get($receiverName);
147-
$retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
162+
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
163+
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
164+
}
165+
166+
$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
167+
$retryStrategies[$receiverName] = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
168+
}
148169

149170
if (null !== $input->getOption('bus')) {
150171
$bus = $this->busLocator->get($input->getOption('bus'));
151172
} else {
152173
$bus = new RoutableMessageBus($this->busLocator);
153174
}
154175

176+
$worker = new Worker($receivers, $bus, $retryStrategies, $this->eventDispatcher, $this->logger);
155177
$stopsWhen = [];
156178
if ($limit = $input->getOption('limit')) {
157179
$stopsWhen[] = "processed {$limit} messages";
158-
$receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger);
180+
$worker = new StopWhenMessageCountIsExceededWorker($worker, $limit, $this->logger);
159181
}
160182

161183
if ($memoryLimit = $input->getOption('memory-limit')) {
162184
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
163-
$receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger);
185+
$worker = new StopWhenMemoryUsageIsExceededWorker($worker, $this->convertToBytes($memoryLimit), $this->logger);
164186
}
165187

166188
if ($timeLimit = $input->getOption('time-limit')) {
167189
$stopsWhen[] = "been running for {$timeLimit}s";
168-
$receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger);
190+
$worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger);
169191
}
170192

171193
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
172-
$io->success(sprintf('Consuming messages from transport "%s".', $receiverName));
194+
$io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames)));
173195

174196
if ($stopsWhen) {
175197
$last = array_pop($stopsWhen);
@@ -183,8 +205,9 @@ protected function execute(InputInterface $input, OutputInterface $output): void
183205
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
184206
}
185207

186-
$worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger);
187-
$worker->run();
208+
$worker->run([
209+
'sleep' => $input->getOption('sleep') * 1000000,
210+
]);
188211
}
189212

190213
private function convertToBytes(string $memoryLimit): int

src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,8 @@ class ConsumeMessagesCommandTest extends TestCase
2020
public function testConfigurationWithDefaultReceiver()
2121
{
2222
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp']);
23-
$inputArgument = $command->getDefinition()->getArgument('receiver');
23+
$inputArgument = $command->getDefinition()->getArgument('receivers');
2424
$this->assertFalse($inputArgument->isRequired());
25-
$this->assertSame('amqp', $inputArgument->getDefault());
26-
}
27-
28-
public function testConfigurationWithoutDefaultReceiver()
29-
{
30-
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp', 'dummy']);
31-
$inputArgument = $command->getDefinition()->getArgument('receiver');
32-
$this->assertTrue($inputArgument->isRequired());
33-
$this->assertNull($inputArgument->getDefault());
25+
$this->assertSame(['amqp'], $inputArgument->getDefault());
3426
}
3527
}

src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -612,11 +612,9 @@ public function __invoke(DummyMessage $message): void
612612

613613
class DummyReceiver implements ReceiverInterface
614614
{
615-
public function receive(callable $handler): void
615+
public function get(): iterable
616616
{
617-
for ($i = 0; $i < 3; ++$i) {
618-
$handler(new Envelope(new DummyMessage("Dummy $i")));
619-
}
617+
yield new Envelope(new DummyMessage('Dummy'));
620618
}
621619

622620
public function stop(): void

src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php

Lines changed: 0 additions & 48 deletions
This file was deleted.
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
<?php
2+
3+
namespace Symfony\Component\Messenger\Tests\Fixtures;
4+
5+
use Symfony\Component\Messenger\WorkerInterface;
6+
7+
class DummyWorker implements WorkerInterface
8+
{
9+
private $isStopped = false;
10+
private $envelopesToReceive;
11+
private $envelopesHandled = 0;
12+
13+
public function __construct(array $envelopesToReceive)
14+
{
15+
$this->envelopesToReceive = $envelopesToReceive;
16+
}
17+
18+
public function run(array $options = [], callable $onHandledCallback = null): void
19+
{
20+
foreach ($this->envelopesToReceive as $envelope) {
21+
if (true === $this->isStopped) {
22+
break;
23+
}
24+
25+
if ($onHandledCallback) {
26+
$onHandledCallback($envelope);
27+
++$this->envelopesHandled;
28+
}
29+
}
30+
}
31+
32+
public function stop(): void
33+
{
34+
$this->isStopped = true;
35+
}
36+
37+
public function isStopped(): bool
38+
{
39+
return $this->isStopped;
40+
}
41+
42+
public function countEnvelopesHandled()
43+
{
44+
return $this->envelopesHandled;
45+
}
46+
}

0 commit comments

Comments
 (0)
0