8000 feature #30708 [Messenger] ReceiverInterface::handle() to get() & Wor… · symfony/symfony@b12351a · GitHub
[go: up one dir, main page]

Skip to content
  • Commit b12351a

    Browse files
    committed
    feature #30708 [Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports (weaverryan)
    This PR was squashed before being merged into the 4.3-dev branch (closes #30708). Discussion ---------- [Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | Helps with #30699 | License | MIT | Doc PR | TODO Highlights: * `messenger:consume` can now consume messages from multiple transports with priority ❗️ ``` bin/console messenger:consume amqp_high amqp_medium amqp_low ``` * How long you want to sleep before checking more messages is now an option to `messenger:consume` * `ReceiverInterface::receive()` is replaced with `ReceiverInterface::get()` * Logic for looping & sleeping is moved into `Worker` Commits ------- e800bd5 [Messenger] ReceiverInterface::handle() to get() & Worker with prioritized transports
    2 parents 2389d7c + e800bd5 commit 
    8000
    b12351a

    28 files changed

    +917
    -804
    lines changed

    src/Symfony/Component/Messenger/CHANGELOG.md

    Lines changed: 6 additions & 3 deletions
    Original file line numberDiff line numberDiff line change
    @@ -11,8 +11,9 @@ CHANGELOG
    1111
    to the `Envelope` then find the correct bus when receiving from
    1212
    the transport. See `ConsumeMessagesCommand`.
    1313
    * The optional `$busNames` constructor argument of the class `ConsumeMessagesCommand` was removed.
    14-
    * [BC BREAK] 2 new methods were added to `ReceiverInterface`:
    15-
    `ack()` and `reject()`.
    14+
    * [BC BREAK] 3 new methods were added to `ReceiverInterface`:
    15+
    `ack()`, `reject()` and `get()`. The methods `receive()`
    16+
    and `stop()` were removed.
    1617
    * [BC BREAK] Error handling was moved from the receivers into
    1718
    `Worker`. Implementations of `ReceiverInterface::handle()`
    1819
    should now allow all exceptions to be thrown, except for transport
    @@ -24,7 +25,9 @@ CHANGELOG
    2425
    * The default command name for `ConsumeMessagesCommand` was
    2526
    changed from `messenger:consume-messages` to `messenger:consume`
    2627
    * `ConsumeMessagesCommand` has two new optional constructor arguments
    27-
    * `Worker` has 4 new option constructor arguments.
    28+
    * [BC BREAK] The first argument to Worker changed from a single
    29+
    `ReceiverInterface` to an array of `ReceiverInterface`.
    30+
    * `Worker` has 3 new optional constructor arguments.
    2831
    * The `Worker` class now handles calling `pcntl_signal_dispatch()` the
    2932
    receiver no longer needs to call this.
    3033
    * The `AmqpSender` will now retry messages using a dead-letter exchange

    src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

    Lines changed: 50 additions & 27 deletions
    Original file line numberDiff line numberDiff line change
    @@ -19,12 +19,13 @@
    1919
    use Symfony\Component\Console\Input\InputInterface;
    2020
    use Symfony\Component\Console\Input\InputOption;
    2121
    use Symfony\Component\Console\Output\OutputInterface;
    22+
    use Symfony\Component\Console\Question\ChoiceQuestion;
    2223
    use Symfony\Component\Console\Style\SymfonyStyle;
    2324
    use Symfony\Component\Messenger\RoutableMessageBus;
    24-
    use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
    25-
    use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
    26-
    use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
    2725
    use Symfony\Component\Messenger\Worker;
    26+
    use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
    27+
    use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
    28+
    use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
    2829
    use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
    2930

    3031
    /**
    @@ -70,10 +71,11 @@ protected function configure(): void
    7071

    7172
    $this
    7273
    ->setDefinition([
    73-
    new InputArgument('receiver', $defaultReceiverName ? InputArgument::OPTIONAL : InputArgument::REQUIRED, 'Name of the receiver', $defaultReceiverName),
    74+
    new InputArgument('receivers', InputArgument::IS_ARRAY, 'Names of the receivers/transports to consume in order of priority', $defaultReceiverName ? [$defaultReceiverName] : []),
    7475
    new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
    7576
    new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
    7677
    new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
    78+
    new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
    7779
    new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically.'),
    7880
    ])
    7981
    ->setDescription('Consumes messages')
    @@ -82,6 +84,10 @@ protected function configure(): void
    8284
    8385
    <info>php %command.full_name% <receiver-name></info>
    8486
    87+
    To receive from multiple transports, pass each name:
    88+
    89+
    <info>php %command.full_name% receiver1 receiver2</info>
    90+
    8591
    Use the --limit option to limit the number of messages received:
    8692
    8793
    <info>php %command.full_name% <receiver-name> --limit=10</info>
    @@ -111,16 +117,22 @@ protected function interact(InputInterface $input, OutputInterface $output)
    111117
    {
    112118
    $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
    113119

    114-
    if ($this->receiverNames && !$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
    115-
    if (null === $receiverName) {
    116-
    $io->block('Missing receiver argument.', null, 'error', ' ', true);
    117-
    $input->setArgument('receiver', $io->choice('Select one of the available receivers', $this->receiverNames));
    118-
    } elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) {
    119-
    $io->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true);
    120-
    if ($io->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) {
    121-
    $input->setArgument('receiver', $alternatives[0]);
    122-
    }
    120+
    if ($this->receiverNames && 0 === \count($input->getArgument('receivers'))) {
    121+
    $io->block('Which transports/receivers do you want to consume?', null, 'fg=white;bg=blue', ' ', true);
    122+
    123+
    $io->writeln('Choose which receivers you want to consume messages from in order of priority.');
    124+
    if (\count($this->receiverNames) > 1) {
    125+
    $io->writeln(sprintf('Hint: to consume from multiple, use a list of their names, e.g. <comment>%s</comment>', implode(', ', $this->receiverNames)));
    123126
    }
    127+
    128+
    $question = new ChoiceQuestion('Select receivers to consume:', $this->receiverNames, 0);
    129+
    $question->setMultiselect(true);
    130+
    131+
    $input->setArgument('receivers', $io->askQuestion($question));
    132+
    }
    133+
    134+
    if (0 === \count($input->getArgument('receivers'))) {
    135+
    throw new RuntimeException('Please pass at least one receiver.');
    124136
    }
    125137
    }
    126138

    @@ -135,41 +147,51 @@ protected function execute(InputInterface $input, OutputInterface $output): void
    135147
    $output->writeln(sprintf('<comment>%s</comment>', $message));
    136148
    }
    137149

    138-
    if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
    139-
    throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
    140-
    }
    150+
    $receivers = [];
    151+
    $retryStrategies = [];
    152+
    foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) {
    153+
    if (!$this->receiverLocator->has($receiverName)) {
    154+
    $message = sprintf('The receiver "%s" does not exist.', $receiverName);
    155+
    if ($this->receiverNames) {
    156+
    $message .= sprintf(' Valid receivers are: %s.', implode(', ', $this->receiverNames));
    157+
    }
    141158

    142-
    if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
    143-
    throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
    144-
    }
    159+
    throw new RuntimeException($message);
    160+
    }
    145161

    146-
    $receiver = $this->receiverLocator->get($receiverName);
    147-
    $retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
    162+
    if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
    163+
    throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
    164+
    }
    165+
    166+
    $receivers[$receiverName] = $this->receiverLocator->get($receiverName);
    167+
    $retryStrategies[$receiverName] = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
    168+
    }
    148169

    149170
    if (null !== $input->getOption('bus')) {
    150171
    $bus = $this->busLocator->get($input->getOption('bus'));
    151172
    } else {
    152173
    $bus = new RoutableMessageBus($this->busLocator);
    153174
    }
    154175

    176+
    $worker = new Worker($receivers, $bus, $retryStrategies, $this->eventDispatcher, $this->logger);
    155177
    $stopsWhen = [];
    156178
    if ($limit = $input->getOption('limit')) {
    157179
    $stopsWhen[] = "processed {$limit} messages";
    158-
    $receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger);
    180+
    $worker = new StopWhenMessageCountIsExceededWorker($worker, $limit, $this->logger);
    159181
    }
    160182

    161183
    if ($memoryLimit = $input->getOption('memory-limit')) {
    162184
    $stopsWhen[] = "exceeded {$memoryLimit} of memory";
    163-
    $receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger);
    185+
    $worker = new StopWhenMemoryUsageIsExceededWorker($worker, $this->convertToBytes($memoryLimit), $this->logger);
    164186
    }
    165187

    166188
    if ($timeLimit = $input->getOption('time-limit')) {
    167189
    $stopsWhen[] = "been running for {$timeLimit}s";
    168-
    $receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger);
    190+
    $worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger);
    169191
    }
    170192

    171193
    $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
    172-
    $io->success(sprintf('Consuming messages from transport "%s".', $receiverName));
    194+
    $io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames)));
    173195

    174196
    if ($stopsWhen) {
    175197
    $last = array_pop($stopsWhen);
    @@ -183,8 +205,9 @@ protected function execute(InputInterface $input, OutputInterface $output): void
    183205
    $io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
    184206
    }
    185207

    186-
    $worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger);
    187-
    $worker->run();
    208+
    $worker->run([
    209+
    'sleep' => $input->getOption('sleep') * 1000000,
    210+
    ]);
    188211
    }
    189212

    190213
    private function convertToBytes(string $memoryLimit): int

    src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php

    Lines changed: 2 additions & 10 deletions
    Original file line numberDiff line numberDiff line change
    @@ -20,16 +20,8 @@ class ConsumeMessagesCommandTest extends TestCase
    2020
    public function testConfigurationWithDefaultReceiver()
    2121
    {
    2222
    $command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp']);
    23-
    $inputArgument = $command->getDefinition()->getArgument('receiver');
    23+
    $inputArgument = $command->getDefinition()->getArgument('receivers');
    2424
    $this->assertFalse($inputArgument->isRequired());
    25-
    $this->assertSame('amqp', $inputArgument->getDefault());
    26-
    }
    27-
    28-
    public function testConfigurationWithoutDefaultReceiver()
    29-
    {
    30-
    $command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp', 'dummy']);
    31-
    $inputArgument = $command->getDefinition()->getArgument('receiver');
    32-
    $this->assertTrue($inputArgument->isRequired());
    33-
    $this->assertNull($inputArgument->getDefault());
    25+
    $this->assertSame(['amqp'], $inputArgument->getDefault());
    3426
    }
    3527
    }

    src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php

    Lines changed: 2 additions & 4 deletions
    Original file line numberDiff line numberDiff line change
    @@ -612,11 +612,9 @@ public function __invoke(DummyMessage $message): void
    612612

    613613
    class DummyReceiver implements ReceiverInterface
    614614
    {
    615-
    public function receive(callable $handler): void
    615+
    public function get(): iterable
    616616
    {
    617-
    for ($i = 0; $i < 3; ++$i) {
    618-
    $handler(new Envelope(new DummyMessage("Dummy $i")));
    619-
    }
    617+
    yield new Envelope(new DummyMessage('Dummy'));
    620618
    }
    621619

    622620
    public function stop(): void

    src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php

    Lines changed: 0 additions & 48 deletions
    This file was deleted.
    Lines changed: 46 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -0,0 +1,46 @@
    1+
    <?php
    2+
    3+
    namespace Symfony\Component\Messenger\Tests\Fixtures;
    4+
    5+
    use Symfony\Component\Messenger\WorkerInterface;
    6+
    7+
    class DummyWorker implements WorkerInterface
    8+
    {
    9+
    private $isStopped = false;
    10+
    private $envelopesToReceive;
    11+
    private $envelopesHandled = 0;
    12+
    13+
    public function __construct(array $envelopesToReceive)
    14< B83A code class="diff-text syntax-highlighted-line addition">+
    {
    15+
    $this->envelopesToReceive = $envelopesToReceive;
    16+
    }
    17+
    18+
    public function run(array $options = [], callable $onHandledCallback = null): void
    19+
    {
    20+
    foreach ($this->envelopesToReceive as $envelope) {
    21+
    if (true === $this->isStopped) {
    22+
    break;
    23+
    }
    24+
    25+
    if ($onHandledCallback) {
    26+
    $onHandledCallback($envelope);
    27+
    ++$this->envelopesHandled;
    28+
    }
    29+
    }
    30+
    }
    31+
    32+
    public function stop(): void
    33+
    {
    34+
    $this->isStopped = true;
    35+
    }
    36+
    37+
    public function isStopped(): bool
    38+
    {
    39+
    return $this->isStopped;
    40+
    }
    41+
    42+
    public function countEnvelopesHandled()
    43+
    {
    44+
    return $this->envelopesHandled;
    45+
    }
    46+
    }

    0 commit comments

    Comments
     (0)
    0