8000 [Messenger] [WIP] Be able to start a worker for multiple queues with custom consumption priorities by d-ph · Pull Request #47602 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[Messenger] [WIP] Be able to start a worker for multiple queues with custom consumption priorities #47602

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: 7.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
#45882 [Messenger] Be able to start a worker for multiple queues with…
… custom consumption priorities.
  • Loading branch information
d-ph committed Sep 16, 2022
commit b62a53c1c665141b9369aa94af4f0a3975d99b48
Original file line number Diff li 10000 ne number Diff line change
Expand Up @@ -2183,6 +2183,11 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder

$sendersServiceLocator = ServiceLocatorTagPass::register($container, $senderReferences);

$workerExecutionStrategyRegistry = $container->getDefinition('messenger.worker_execution_strategy.registry');
foreach ($container->findTaggedServiceIds('messenger.worker_execution_strategy', true) as $serviceId => $unused) {
$workerExecutionStrategyRegistry->addMethodCall('registerStrategy', [$container->findDefinition($serviceId)->getClass()]);
}

$container->getDefinition('messenger.senders_locator')
->replaceArgument(0, $messageToSendersMapping)
->replaceArgument(1, $sendersServiceLocator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@
abstract_arg('Routable message bus'),
service('messenger.receiver_locator'),
service('event_dispatcher'),
service('messenger.worker_execution_strategy.registry'),
service('logger')->nullOnInvalid(),
[], // Receiver names
service('messenger.listener.reset_services')->nullOnInvalid(),
Expand Down
11 changes: 11 additions & 0 deletions src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory;
use Symfony\Component\Messenger\Transport\TransportFactory;
use Symfony\Component\Messenger\WorkerExecution\DefaultWorkerExecutionStrategy;
use Symfony\Component\Messenger\WorkerExecution\RankedWorkerExecutionStrategy;
use Symfony\Component\Messenger\WorkerExecution\WorkerExecutionStrategyRegistry;

return static function (ContainerConfigurator $container) {
$container->services()
Expand Down Expand Up @@ -216,5 +219,13 @@
abstract_arg('message bus locator'),
service('messenger.default_bus'),
])

->set('messenger.worker_execution_strategy.registry', WorkerExecutionStrategyRegistry::class)

->set('messenger.worker_execution_strategy.default', DefaultWorkerExecutionStrategy::class)
->tag('messenger.worker_execution_strategy')

->set('messenger.worker_execution_strategy.ranked', RankedWorkerExecutionStrategy::class)
->tag('messenger.worker_exec 8000 ution_strategy')
;
};
17 changes: 15 additions & 2 deletions src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Messenger\WorkerExecution\DefaultWorkerExecutionStrategy;
use Symfony\Component\Messenger\WorkerExecution\WorkerExecutionStrategyRegistry;

/**
* @author Samuel Roze <samuel.roze@gmail.com>
Expand All @@ -48,8 +50,9 @@ class ConsumeMessagesCommand extends Command
private ?ResetServicesListener $resetServicesListener;
private array $busIds;
private ?ContainerInterface $rateLimiterLocator;
private WorkerExecutionStrategyRegistry $strategyRegistry;

public function __construct(RoutableMessageBus $routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, array $receiverNames = [], ResetServicesListener $resetServicesListener = null, array $busIds = [], ContainerInterface $rateLimiterLocator = null)
public function __construct(RoutableMessageBus $routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, WorkerExecutionStrategyRegistry $strategyRegistry, LoggerInterface $logger = null, array $receiverNames = [], ResetServicesListener $resetServicesListener = null, array $busIds = [], ContainerInterface $rateLimiterLocator = null)
{
$this->routableBus = $routableBus;
$this->receiverLocator = $receiverLocator;
Expand All @@ -59,6 +62,7 @@ public function __construct(RoutableMessageBus $routableBus, ContainerInterface
$this->resetServicesListener = $resetServicesListener;
$this->busIds = $busIds;
$this->rateLimiterLocator = $rateLimiterLocator;
$this->strategyRegistry = $strategyRegistry;

parent::__construct();
}
Expand All @@ -78,6 +82,8 @@ protected function configure(): void
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically)'),
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
new InputOption('strategy', null, InputOption::VALUE_REQUIRED, 'Execution strategy', DefaultWorkerExecutionStrategy::getAlias()),
new InputOption('strategy-config', null, InputOption::VALUE_REQUIRED, 'Json-encoded custom config for the strategy. See the chosen strategy for info', '{}'),
])
->setHelp(<<<'EOF'
The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
Expand Down Expand Up @@ -210,7 +216,14 @@ protected function execute(InputInterface $input, OutputInterface $output): int

$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;

$worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger, $rateLimiters);
$strategyAlias = $input->getOption('strategy');
$strategyConfig = json_decode($input->getOption('strategy-config'), true);
if ($strategyConfig === null) {
throw new RuntimeException('Could not json-decode the value of the --strategy-config parameter');
}
$strategy = $this->strategyRegistry->createStrategy($strategyAlias, $strategyConfig);

$worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger, $rateLimiters, $strategy);
$options = [
'sleep' => $input->getOption('sleep') * 1000000,
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,9 @@ private function registerReceivers(ContainerBuilder $container, array $busIds)
$consumeCommandDefinition->replaceArgument(0, new Reference('messenger.routable_message_bus'));
}

$consumeCommandDefinition->replaceArgument(4, array_values($receiverNames));
$consumeCommandDefinition->replaceArgument(5, array_values($receiverNames));
try {
$consumeCommandDefinition->replaceArgument(6, $busIds);
$consumeCommandDefinition->replaceArgument(7, $busIds);
} catch (OutOfBoundsException) {
// ignore to preserve compatibility with symfony/framework-bundle < 5.4
}
Expand Down
64 changes: 35 additions & 29 deletions src/Symfony/Component/Messenger/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\RateLimiter\LimiterInterface;
use Symfony\Component\Messenger\WorkerExecution\WorkerExecutionStrategyContext;
use Symfony\Component\Messenger\WorkerExecution\WorkerExecutionStrategyInterface;

/**
* @author Samuel Roze <samuel.roze@gmail.com>
Expand All @@ -49,11 +51,12 @@ class Worker
private array $acks = [];
private \SplObjectStorage $unacks;
private ?array $rateLimiters;
private WorkerExecutionStrategyInterface $executionStrategy;

/**
* @param ReceiverInterface[] $receivers Where the key is the transport name
*/
public function __construct(array $receivers, MessageBusInterface $bus, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null, array $rateLimiters = null)
public function __construct(array $receivers, MessageBusInterface $bus, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null, array $rateLimiters = null, WorkerExecutionStrategyInterface $executionStrategy)
{
$this->receivers = $receivers;
$this->bus = $bus;
Expand All @@ -64,6 +67,7 @@ public function __construct(array $receivers, MessageBusInterface $bus, EventDis
]);
$this->unacks = new \SplObjectStorage();
$this->rateLimiters = $rateLimiters;
$this->executionStrategy = $executionStrategy;
}

/**
Expand Down Expand Up @@ -93,35 +97,13 @@ public function run(array $options = []): void
}
}

$workerExecutionContext = new WorkerExecutionStrategyContext($this, $queueNames ?: []);

while (!$this->shouldStop) {
$envelopeHandled = false;
$envelopeHandledStart = microtime(true);
foreach ($this->receivers as $transportName => $receiver) {
if ($queueNames) {
$envelopes = $receiver->getFromQueues($queueNames);
} else {
$envelopes = $receiver->get();
}

foreach ($envelopes as $envelope) {
$envelopeHandled = true;

$this->rateLimit($transportName);
$this->handleMessage($envelope, $transportName);
$this->eventDispatcher?->dispatch(new WorkerRunningEvent($this, false));

if ($this->shouldStop) {
break 2;
}
}

// after handling a single receiver, quit and start the loop again
// this should prevent multiple lower priority receivers from
// blocking too long before the higher priority are checked
if ($envelopeHandled) {
break;
}
}
$result = $this->executionStrategy->processQueueTasks($workerExecutionContext);
$envelopeHandled = $result->wereEnvelopesHandled;

if (!$envelopeHandled && $this->flush(false)) {
continue;
Expand All @@ -140,7 +122,10 @@ public function run(array $options = []): void
$this->eventDispatcher?->dispatch(new WorkerStoppedEvent($this));
}

private function handleMessage(Envelope $envelope, string $transportName): void
/**
* @internal
*/
public function handleMessage(Envelope $envelope, string $transportName): void
{
$event = new WorkerMessageReceivedEvent($envelope, $transportName);
$this->eventDispatcher?->dispatch($event);
Expand Down Expand Up @@ -222,7 +207,10 @@ private function ack(): bool
return (bool) $acks;
}

private function rateLimit(string $transportName): void
/**
* @internal
*/
public function rateLimit(string $transportName): void
{
if (!$this->rateLimiters) {
return;
Expand Down Expand Up @@ -279,4 +267,22 @@ public function getMetadata(): WorkerMetadata
{
return $this->metadata;
}

public function getEventDispatcher(): ?EventDispatcherInterface
{
return $this->eventDispatcher;
}

public function getShouldStop(): bool
{
return $this->shouldStop;
}

/**
* @return ReceiverInterface[]
*/
public function getReceivers(): array
{
return $this->receivers;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php

namespace Symfony\Component\Messenger\WorkerExecution;

final class DefaultWorkerExecutionStrategy implements WorkerExecutionStrategyInterface
{
public function __construct(array $options)
{
}

public static function getAlias(): string
{
return 'default';
}

public function processQueueTasks(WorkerExecutionStrategyContext $context): WorkerExecutionStrategyResult
{
$envelopeHandled = false;

foreach ($context->getReceivers() as $transportName => $receiver) {
if ($context->getQueueNames()) {
$envelopes = $receiver->getFromQueues($context->getQueueNames());
} else {
$envelopes = $receiver->get();
}

foreach ($envelopes as $envelope) {
$envelopeHandled = true;

$result = $context->handleMessage($envelope, $transportName);

if ($result->shouldStop) {
break 2;
}
}

// after handling a single receiver, quit and start the loop again
// this should prevent multiple lower priority receivers from
// blocking too long before the higher priority are checked
if ($envelopeHandled) {
break;
}
}

return new WorkerExecutionStrategyResult($envelopeHandled);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
<?php

namespace Symfony\Component\Messenger\WorkerExecution;

use RuntimeException;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;

/**
* Example worker command:
*
* bin/console messenger:consume queue_a queue_b queue_b_low queue_b_lowest queue_c \
* --strategy="com.symfony.ranked" \
* --strategy-config="{\"ranks\": [1, 1, 2, 3, 1]}"
*
* The command above will result in the following execution pattern:
*
* 1. (queue_a, queue_b, queue_c)
* 2. queue_b_low
* 3. queue_b_lowest
*/
final class RankedWorkerExecutionStrategy implements WorkerExecutionStrategyInterface
{
/**
* * ranks: int[]. Queue receivers with the same rank integer value will form a single group. Groups are executed
* from lowest to highest integer value.
*/
private array $options;

public function __construct(array $options)
{
$this->options = $options;

$this->validateOptions($options);
}

private function validateOptions(array $options): void
{
if (!array_key_exists('ranks', $options) || !is_array($options['ranks'])) {
throw new RuntimeException('Invalid worker ranked strategy options. Missing key "ranks" or it is not an array');
}

foreach ($options['ranks'] as $rankNumber) {
if (!is_int($rankNumber)) {
throw new RuntimeException('Invalid worker ranked strategy options. One of the ranks is not an integer');
}
}
}

public static function getAlias(): string
{
return 'com.symfony.ranked';
}

public function processQueueTasks(WorkerExecutionStrategyContext $context): WorkerExecutionStrategyResult
{
$envelopeHandled = false;

foreach ($this->groupReceiversByRanks($context->getReceivers()) as $receiversGroup) {
foreach ($receiversGroup as $transportName => $receiver) {
if ($context->getQueueNames()) {
$envelopes = $receiver->getFromQueues($context->getQueueNames());
} else {
$envelopes = $receiver->get();
}

foreach ($envelopes as $envelope) {
$envelopeHandled = true;

$result = $context->handleMessage($envelope, $transportName);

if ($result->shouldStop) {
break 3;
}
}
}

// after handling a single grouped rank of receivers, quit and start the loop again
// this should prevent multiple lower priority receivers from
// blocking too long before the higher priority are checked
if ($envelopeHandled) {
break;
}
}

return new WorkerExecutionStrategyResult($envelopeHandled);
}

/**
* @param ReceiverInterface[] $receivers Where the key is the transport name
* @return array<int, array<string, ReceiverInterface>> Ordered groups of receivers by ranks number
*/
private function groupReceiversByRanks(array $receivers): array
{
$receiversRanks = $this->options['ranks'];
$receiversValues = array_values($receivers);
$receiversKeys = array_keys($receivers);

if (count($receiversRanks) !== count($receivers)) {
throw new RuntimeException('Worker ranked strategy: The count of queue receivers does not match the count of their ranks');
}

/**
* @var array<int, array<string, ReceiverInterface>> $receiversGroupedByRanks
*/
$receiversGroupedByRanks = [];
foreach ($receiversValues as $index => $receiver) {
$receiversGroupedByRanks[(int) $receiversRanks[$index]][$receiversKeys[$index]] = $receiver;
}

uksort($receiversGroupedByRanks, static function ($rankA, $rankB) {
return $rankA <=> $rankB;
});

return $receiversGroupedByRanks;
}
}
Loading
0