8000 [Messenger] implementation of messenger:consume, which processes mes… · symfony/symfony@c1d082a · GitHub
[go: up one dir, main page]

Skip to content

Commit c1d082a

Browse files
committed
[Messenger] implementation of messenger:consume, which processes messages concurrently
1 parent b167190 commit c1d082a

File tree

11 files changed

+234
-13
lines changed

11 files changed

+234
-13
lines changed

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2158,7 +2158,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
21582158

21592159
if ($busId === $config['default_bus']) {
21602160
$container->setAlias('messenger.default_bus', $busId)->setPublic(true);
2161-
$container->setAlias(MessageBusInterface::class, $busId);
2161+
$container->setAlias(MessageBusInterface::class, $busId)->setPublic(true);
21622162
} else {
21632163
$container->registerAliasForArgument($busId, MessageBusInterface::class);
21642164
}

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
3434
use Symfony\Component\Messenger\Middleware\TraceableMiddleware;
3535
use Symfony\Component\Messenger\Middleware\ValidationMiddleware;
36+
use Symfony\Component\Messenger\ParallelMessageBus;
3637
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
3738
use Symfony\Component\Messenger\RoutableMessageBus;
3839
use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransportFactory;
@@ -54,6 +55,7 @@
5455
abstract_arg('per message senders map'),
5556
abstract_arg('senders service locator'),
5657
])
58+
5759
->set('messenger.middleware.send_message', SendMessageMiddleware::class)
5860
->abstract()
5961
->args([
@@ -134,6 +136,13 @@
134136
])
135137
->tag('messenger.transport_factory')
136138

139+
->set('parallel_bus', ParallelMessageBus::class)
140+
->args([
141+
[],
142+
service('kernel'),
143+
])
144+
->tag('messenger.bus')
145+
137146
->set('messenger.transport.in_memory.factory', InMemoryTransportFactory::class)
138147
->tag('messenger.transport_factory')
139148
->tag('kernel.reset', ['method' => 'reset'])

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ protected function configure(): void
8585
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
8686
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
8787
new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'),
88+
new InputOption('parallel-limit', 'p', InputOption::VALUE_REQUIRED, 'The number of concurrent processes', 10),
8889
])
8990
->setHelp(<<<'EOF'
9091
The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
@@ -250,6 +251,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int
250251
$options['queues'] = $queues;
251252
}
252253

254+
$options['parallel-limit'] = $input->getOption('parallel-limit');
255+
253256
try {
254257
$this->worker->run($options);
255258
} finally {
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
/*
13+
* This file is part of the Symfony package.
14+
*
15+
* (c) Fabien Potencier <fabien@symfony.com>
16+
*
17+
* For the full copyright and license information, please view the LICENSE
18+
* file that was distributed with this source code.
19+
*/
20+
21+
namespace Symfony\Component\Messenger;
22+
23+
use Amp\Cache\LocalCache;
24+
use Amp\Cancellation;
25+
use Amp\Parallel\Worker\Task;
26+
use Amp\Sync\Channel;
27+
use App\Kernel;
28+
use Psr\Container\ContainerInterface;
29+
use Symfony\Component\Dotenv\Dotenv;
30+
use Symfony\Component\Messenger\Exception\LogicException;
31+
use Symfony\Component\Messenger\Stamp\AckStamp;
32+
33+
class DispatchTask implements Task
34+
{
35+
private static ?LocalCache $cache = null;
36+
37+
public function __construct(private Envelope $envelope, private array $stamps, private readonly string $env, private readonly bool $isDebug)
38+
{
39+
}
40+
41+
public function run(Channel $channel, Cancellation $cancellation): mixed
42+
{
43+
$container = $this->getContainer();
44+
$envelope = $this->dispatch($container, $channel);
45+
46+
return $envelope->withoutStampsOfType(AckStamp::class);
47+
}
48+
49+
private function dispatch(ContainerInterface $container, $channel)
50+
{
51+
if (!$container->has(MessageBusInterface::class)) {
52+
throw new LogicException(sprintf("%s can't be found.", MessageBusInterface::class));
53+
}
54+
55+
$traceable = $container->get(MessageBusInterface::class);
56+
$messageBus = $traceable->getMessageBus();
57+
58+
return $messageBus->dispatch($this->envelope, $this->stamps);
59+
}
60+
61+
private function getContainer()
62+
{
63+
$cache = self::$cache ??= new LocalCache();
64+
$container = $cache->get('cache-container');
65+
66+
// if not in cache, create container
67+
if (!$container) {
68+
$kernel = new Kernel($this->env, $this->isDebug);
69+
$kernel->boot();
70+
71+
$container = $kernel->getContainer();
72+
$cache->set('cache-container', $container);
73+
74+
(new Dotenv())
75+
->setProdEnvs(['prod'])
76+
->usePutenv(false)
77+
->bootEnv('.env');
78+
}
79+
80+
return $container;
81+
}
82+
}

src/Symfony/Component/Messenger/MessageBusInterface.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,5 @@ interface MessageBusInterface
2727
*
2828
* @throws ExceptionInterface
2929
*/
30-
public function dispatch(object $message, array $stamps = []): Envelope;
30+
public function dispatch(object $message, array $stamps = []): Envelope|array;
3131
}

src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
2121
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
2222
use Symfony\Component\Messenger\Stamp\AckStamp;
23+
use Symfony\Component\Messenger\Stamp\BusNameStamp;
2324
use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp;
2425
use Symfony\Component\Messenger\Stamp\HandledStamp;
2526
use Symfony\Component\Messenger\Stamp\HandlerArgumentsStamp;
@@ -32,6 +33,8 @@ class HandleMessageMiddleware implements MiddlewareInterface
3233
{
3334
use LoggerAwareTrait;
3435

36+
private const PARALLEL_BUS = 'parallel_bus';
37+
3538
public function __construct(
3639
private HandlersLocatorInterface $handlersLocator,
3740
private bool $allowNoHandlers = false,
@@ -64,6 +67,10 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
6467

6568
/** @var AckStamp $ackStamp */
6669
if ($batchHandler && $ackStamp = $envelope->last(AckStamp::class)) {
70+
if ($envelope->last(BusNameStamp::class) && self::PARALLEL_BUS === $envelope->last(BusNameStamp::class)->getBusName()) {
71+
throw new HandlerFailedException($envelope, [new LogicException("Parallel bus can't be used for batch messages")]);
72+
}
73+
6774
$ack = new Acknowledger(get_debug_type($batchHandler), static function (?\Throwable $e = null, $result = null) use ($envelope, $ackStamp, $handlerDescriptor) {
6875
if (null !== $e) {
6976
$e = new HandlerFailedException($envelope, [$handlerDescriptor->getName() => $e]);
@@ -75,7 +82,6 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
7582
});
7683

7784
$result = $this->callHandler($handler, $message, $ack, $envelope->last(HandlerArgumentsStamp::class));
78-
7985
if (!\is_int($result) || 0 > $result) {
8086
throw new LogicException(sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".', \is_int($result) ? $result : get_debug_type($result), get_debug_type($batchHandler)));
8187
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger;
13+
14+
use Amp\Parallel\Worker\ContextWorkerPool;
15+
use Symfony\Component\HttpKernel\KernelInterface;
16+
17+
use function Amp\async;
18+
use function Amp\Parallel\Worker\workerPool;
19+
20+
class ParallelMessageBus implements MessageBusInterface
21+
{
22+
public static ?ContextWorkerPool $worker = null;
23+
24+
public function __construct(private array $something, private readonly KernelInterface $kernel)
25+
{
26+
}
27+
28+
public function dispatch(object $message, array $stamps = []): Envelope|array
29+
{
30+
$worker = (self::$worker ??= workerPool());
31+
32+
$envelope = Envelope::wrap($message, $stamps);
33+
$task = new DispatchTask($envelope, $stamps, $this->kernel->getEnvironment(), $this->kernel->isDebug());
34+
35+
$future = async(function () use ($worker, $task) {
36+
return $worker->submit($task);
37+
});
38+
39+
return [$future, $envelope];
40+
}
41+
}

src/Symfony/Component/Messenger/RoutableMessageBus.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public function __construct(ContainerInterface $busLocator, ?MessageBusInterface
3434
$this->fallbackBus = $fallbackBus;
3535
}
3636

37-
public function dispatch(object $envelope, array $stamps = []): Envelope
37+
public function dispatch(object $envelope, array $stamps = []): Envelope|array
3838
{
3939
if (!$envelope instanceof Envelope) {
4040
throw new InvalidArgumentException('Messages passed to RoutableMessageBus::dispatch() must be inside an Envelope.');

src/Symfony/Component/Messenger/TraceableMessageBus.php

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public function __construct(MessageBusInterface $decoratedBus)
2424
$this->decoratedBus = $decoratedBus;
2525
}
2626

27-
public function dispatch(object $message, array $stamps = []): Envelope
27+
public function dispatch(object $message, array $stamps = []): Envelope|array
2828
{
2929
$envelope = Envelope::wrap($message, $stamps);
3030
$context = [
@@ -41,7 +41,9 @@ public function dispatch(object $message, array $stamps = []): Envelope
4141

4242
throw $e;
4343
} finally {
44-
$this->dispatchedMessages[] = $context + ['stamps_after_dispatch' => array_merge([], ...array_values($envelope->all()))];
44+
if ($envelope instanceof Envelope) {
45+
$this->dispatchedMessages[] = $context + ['stamps_after_dispatch' => array_merge([], ...array_values($envelope->all()))];
46+
}
4547
}
4648
}
4749

@@ -102,4 +104,9 @@ private function getCaller(): array
102104
'line' => $line,
103105
];
104106
}
107+
108+
public function getMessageBus(): MessageBusInterface
109+
{
110+
return $this->decoratedBus;
111+
}
105112
}

src/Symfony/Component/Messenger/Worker.php

Lines changed: 79 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
2828
use Symfony\Component\Messenger\Exception\RuntimeException;
2929
use Symfony\Component\Messenger\Stamp\AckStamp;
30+
use Symfony\Component\Messenger\Stamp\BusNameStamp;
3031
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
3132
use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp;
3233
use Symfony\Component\Messenger\Stamp\NoAutoAckStamp;
@@ -49,6 +50,8 @@ class Worker
4950
private array $acks = [];
5051
private \SplObjectStorage $unacks;
5152

53+
private static $futures = [];
54+
5255
/**
5356
* @param ReceiverInterface[] $receivers Where the key is the transport name
5457
*/
@@ -103,11 +106,16 @@ public function run(array $options = []): void
103106
$envelopes = $receiver->get();
104107
}
105108

109+
if (!$envelopes) {
110+
// flush
111+
$this->handleFutures($transportName);
112+
}
113+
106114
foreach ($envelopes as $envelope) {
107115
$envelopeHandled = true;
108116

109117
$this->rateLimit($transportName);
110-
$this->handleMessage($envelope, $transportName);
118+
$this->handleMessage($envelope, $transportName, $options['parallel-limit']);
111119
$this->eventDispatcher?->dispatch(new WorkerRunningEvent($this, false));
112120

113121
if ($this->shouldStop) {
@@ -142,7 +150,7 @@ public function run(array $options = []): void
142150
$this->eventDispatcher?->dispatch(new WorkerStoppedEvent($this));
143151
}
144152

145-
private function handleMessage(Envelope $envelope, string $transportName): void
153+
private function handleMessage(Envelope $envelope, string $transportName, $parallelProcessesLimit): void
146154
{
147155
$event = new WorkerMessageReceivedEvent($envelope, $transportName);
148156
$this->eventDispatcher?->dispatch($event);
@@ -153,17 +161,49 @@ private function handleMessage(Envelope $envelope, string $transportName): void
153161
}
154162

155163
$acked = false;
156-
$ack = function (Envelope $envelope, ?\Throwable $e = null) use ($transportName, &$acked) {
157-
$acked = true;
158-
$this->acks[] = [$transportName, $envelope, $e];
159-
};
164+
$busNameStamp = $envelope->last(BusNameStamp::class);
165+
166+
// "non concurrent" behaviour
167+
if ('parallel_bus' !== $busNameStamp->getBusName()) {
168+
$ack = function (Envelope $envelope, ?\Throwable $e = null) use ($transportName, &$acked) {
169+
$acked = true;
170+
$this->acks[] = [$transportName, $envelope, $e];
171+
};
172+
}
160173

161174
try {
162175
$e = null;
163-
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp(), new AckStamp($ack)));
176+
177+
$envelope = $envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp());
178+
if ('parallel_bus' !== $busNameStamp->getBusName()) {
179+
$envelope->with(new AckStamp($ack));
180+
}
181+
182+
$future = $this->bus->dispatch($envelope);
183+
184+
// "non concurrent" behaviour
185+
if ($future instanceof Envelope) {
186+
$this->preAck($envelope, $transportName, $acked, $e);
187+
188+
return;
189+
}
190+
191+
self::$futures[] = $future;
164192
} catch (\Throwable $e) {
193+
$this->preAck($envelope, $transportName, $acked, $e);
194+
195+
return;
165196
}
166197

198+
if (\count(self::$futures) < $parallelProcessesLimit) {
199+
return;
200+
}
201+
202+
$this->handleFutures($transportName);
203+
}
204+
205+
private function preAck(Envelope $envelope, string $transportName, bool $acked, $e): void
206+
{
167207
$noAutoAckStamp = $envelope->last(NoAutoAckStamp::class);
168208

169209
if (!$acked && !$noAutoAckStamp) {
@@ -282,4 +322,36 @@ public function getMetadata(): WorkerMetadata
282322
{
283323
return $this->metadata;
284324
}
325+
326+
public function handleFutures(string $transportName): void
327+
{
328+
$toHandle = self::$futures;
329+
self::$futures = [];
330+
$errors = [];
331+
332+
foreach ($toHandle as $future) {
333+
$e = null;
334+
try {
335+
$execution = $future[0]->await();
336+
$envelope = $execution->await();
337+
} catch (\Throwable $e) {
338+
$errors[] = [$future[1], $e];
339+
340+
continue;
341+
}
342+
343+
$this->preAck($envelope, $transportName, false, null);
344+
}
345+
346+
if (!$errors) {
347+
return;
348+
}
349+
350+
// otherwise, there might be an issue with the Messenger event as failedEvent (TO DO: test better)
351+
foreach ($errors as $combo) {
352+
[$envelope, $e] = $combo;
353+
354+
$this->preAck($envelope, $transportName, false, $e);
355+
}
356+
}
285357
}

0 commit comments

Comments
 (0)
0