8000 [Messenger] add welcome notice when running the command · symfony/symfony@673b58b · GitHub
[go: up one dir, main page]

Skip to content

Commit 673b58b

Browse files
[Messenger] add welcome notice when running the command
1 parent 18cd342 commit 673b58b

File tree

2 files changed

+32
-11
lines changed

2 files changed

+32
-11
lines changed

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -94,15 +94,15 @@ protected function configure(): void
9494
*/
9595
protected function interact(InputInterface $input, OutputInterface $output)
9696
{
97-
$style = new SymfonyStyle($input, $output);
97+
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
9898

9999
if ($this->receiverNames && !$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
100100
if (null === $receiverName) {
101-
$style->block('Missing receiver argument.', null, 'error', ' ', true);
102-
$input->setArgument('receiver', $style->choice('Select one of the available receivers', $this->receiverNames));
101+
$io->block('Missing receiver argument.', null, 'error', ' ', true);
102+
$input->setArgument('receiver', $io->choice('Select one of the available receivers', $this->receiverNames));
103103
} elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) {
104-
$style->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true);
105-
if ($style->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) {
104+
$io->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true);
105+
if ($io->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) {
106106
$input->setArgument('receiver', $alternatives[0]);
107107
}
108108
}
@@ -111,17 +111,17 @@ protected function interact(InputInterface $input, OutputInterface $output)
111111
$busName = $input->getOption('bus');
112112
if ($this->busNames && !$this->busLocator->has($busName)) {
113113
if (null === $busName) {
114-
$style->block('Missing bus argument.', null, 'error', ' ', true);
115-
$input->setOption('bus', $style->choice('Select one of the available buses', $this->busNames));
114+
$io->block('Missing bus argument.', null, 'error', ' ', true);
115+
$input->setOption('bus', $io->choice('Select one of the available buses', $this->busNames));
116116
} elseif ($alternatives = $this->findAlternatives($busName, $this->busNames)) {
117-
$style->block(sprintf('Bus "%s" is not defined.', $busName), null, 'error', ' ', true);
117+
$io->block(sprintf('Bus "%s" is not defined.', $busName), null, 'error', ' ', true);
118118

119119
if (1 === \count($alternatives)) {
120-
if ($style->confirm(sprintf('Do you want to dispatch to "%s" instead? ', $alternatives[0]), true)) {
120+
if ($io->confirm(sprintf('Do you want to dispatch to "%s" instead? ', $alternatives[0]), true)) {
121121
$input->setOption('bus', $alternatives[0]);
122122
}
123123
} else {
124-
$input->setOption('bus', $style->choice('Did you mean one of the following buses instead?', $alternatives, $alternatives[0]));
124+
$input->setOption('bus', $io->choice('Did you mean one of the following buses instead?', $alternatives, $alternatives[0]));
125125
}
126126
}
127127
}
@@ -143,18 +143,37 @@ protected function execute(InputInterface $input, OutputInterface $output): void
143143
$receiver = $this->receiverLocator->get($receiverName);
144144
$bus = $this->busLocator->get($busName);
145145

146+
$stopsWhen = [];
146147
if ($limit = $input->getOption('limit')) {
148+
$stopsWhen[] = "processed {$limit} messages";
147149
$receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger);
148150
}
149151

150152
if ($memoryLimit = $input->getOption('memory-limit')) {
153+
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
151154
$receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger);
152155
}
153156

154157
if ($timeLimit = $input->getOption('time-limit')) {
158+
$stopsWhen[] = "been running for {$timeLimit}s";
155159
$receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger);
156160
}
157161

162+
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
163+
$io->success(sprintf('Consuming messages from transport "%s" on bus "%s".', $receiverName, $busName));
164+
165+
if ($stopsWhen) {
166+
$last = array_pop($stopsWhen);
167+
$stopsWhen = ($stopsWhen ? implode(', ', $stopsWhen).' or ' : '').$last;
168+
$io->comment("The worker will automatically exit once it has {$stopsWhen}.");
169+
}
170+
171+
$io->comment('Quit the worker with CONTROL-C.');
172+
173+
if (!$output->isDebug()) {
174+
$io->comment('Re-run the command with a -vvv option to see logs about consumed messages.');
175+
}
176+
158177
$worker = new Worker($receiver, $bus);
159178
$worker->run();
160179
}
@@ -171,7 +190,7 @@ private function convertToBytes(string $memoryLimit): int
171190
$max = (int) $max;
172191
}
173192

174-
switch (substr($memoryLimit, -1)) {
193+
switch (substr(rtrim($memoryLimit, 'b'), -1)) {
175194
case 't': $max *= 1024;
176195
// no break
177196
case 'g': $max *= 1024;

src/Symfony/Component/Messenger/Worker.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
* @author Samuel Roze <samuel.roze@gmail.com>
1919
*
2020
* @experimental in 4.2
21+
*
22+
* @final
2123
*/
2224
class Worker
2325
{

0 commit comments

Comments
 (0)
0