10BC0 feature #43354 [Messenger] allow processing messages in batches (nico… · symfony/symfony@2d817e1 · GitHub
[go: up one dir, main page]

Skip to content

Commit 2d817e1

Browse files
committed
feature #43354 [Messenger] allow processing messages in batches (nicolas-grekas)
This PR was merged into the 5.4 branch. Discussion ---------- [Messenger] allow processing messages in batches | Q | A | ------------- | --- | Branch? | 5.4 | Bug fix? | no | New feature? | yes | Deprecations? | no | Tickets | #36910 | License | MIT | Doc PR | - This replaces #42873 as it proposes an alternative approach to handling messages in batch. `BatchHandlerInterface` says it all: if a handler implements this interface, then it should expect a new `$ack` optional argument to be provided when `__invoke()` is called. When `$ack` is not provided, `__invoke()` is expected to handle the message synchronously as usual. But when `$ack` is provided, `__invoke()` is expected to buffer the message and its `$ack` function, and to return the number of pending messages in the batch. Batch handlers are responsible for deciding when they flush their buffers, calling the `$ack` functions while doing so. Best reviewed [ignoring whitespaces](https://github.com/symfony/symfony/pull/43354/files?w=1). Here is what a batch handler might look like: ```php class MyBatchHandler implements BatchHandlerInterface { use BatchHandlerTrait; public function __invoke(MyMessage $message, Acknowledger $ack = null) { return $this->handle($message, $ack); } private function process(array $jobs): void { foreach ($jobs as [$job, $ack]) { try { // [...] compute $result from $job $ack->ack($result); } catch (\Throwable $e) { $ack->nack($e); } } } } ``` By default, `$jobs` contains the messages to handle, but it can be anything as returned by `BatchHandlerTrait::schedule()` (eg a Symfony HttpClient response derived from the message, a promise, etc.). The size of the batch is controlled by `BatchHandlerTrait::shouldProcess()` (defaults to 10). The transport is acknowledged in batch, *after* the bus returned from dispatching (unlike what is done in #42873). This is especially important when considering transactions since we don't want to ack unless the transaction committed successfully. By default, pending batches are flushed when the worker is idle and when it stops. Commits ------- 81e52b2 [Messenger] allow processing messages in batches
2 parents 39123f0 + 81e52b2 commit 2d817e1

File tree

14 files changed

+758
-75
lines changed

14 files changed

+758
-75
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,8 @@ public function testItReceivesSignals()
215215
with stamps: [
216216
"Symfony\\Component\\Messenger\\Bridge\\Amqp\\Transport\\AmqpReceivedStamp",
217217
"Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp",
218-
"Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp"
218+
"Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp",
219+
"Symfony\\Component\\Messenger\\Stamp\\AckStamp"
219220
]
220221
Done.
221222

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CHANGELOG
55
---
66

77
* Add `AsMessageHandler` attribute for declaring message handlers on PHP 8.
8+
* Add support for handling messages in batches with `BatchHandlerInterface` and corresponding trait
89
* Add `StopWorkerExceptionInterface` and its implementation `StopWorkerException` to stop the worker.
910
* Add support for resetting container services after each messenger message.
1011
* Added `WorkerMetadata` class which allows you to access the configuration details of a worker, like `queueNames` and `transportNames` it consumes from.
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
use Symfony\Component\Messenger\Exception\LogicException;
15+
16+
/**
17+
* @author Nicolas Grekas <p@tchwork.com>
18+
*/
19+
class Acknowledger
20+
{
21+
private $handlerClass;
22+
private $ack;
23+
private $error = null;
24+
private $result = null;
25+
26+
/**
27+
* @param null|\Closure(\Throwable|null, mixed):void $ack
28+
*/
29+
public function __construct(string $handlerClass, \Closure $ack = null)
30+
{
31+
$this->handlerClass = $handlerClass;
32+
$this->ack = $ack ?? static function () {};
33+
}
34+
35+
/**
36+
* @param mixed $result
37+
*/
38+
public function ack($result = null): void
39+
{
40+
$this->doAck(null, $result);
41+
}
42+
43+
public function nack(\Throwable $error): void
44+
{
45+
$this->doAck($error);
46+
}
47+
48+
public function getError(): ?\Throwable
49+
{
50+
return $this->error;
51+
}
52+
53+
/**
54+
* @return mixed
55+
*/
56+
public function getResult()
57+
{
58+
return $this->result;
59+
}
60+
61+
public function isAcknowledged(): bool
62+
{
63+
return null === $this->ack;
64+
}
65+
66+
public function __destruct()
67+
{
68+
if ($this->ack instanceof \Closure) {
69+
throw new LogicException(sprintf('The acknowledger was not called by the "%s" batch handler.', $this->handlerClass));
70+
}
71+
}
72+
73+
private function doAck(\Throwable $e = null, $result = null): void
74+
{
75+
if (!$ack = $this->ack) {
76+
throw new LogicException(sprintf('The acknowledger cannot be called twice by the "%s" batch handler.', $this->handlerClass));
77+
}
78+
$this->ack = null;
79+
$this->error = $e;
80+
$this->result = $result;
81+
$ack($e, $result);
82+
}
83+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
/**
15+
* @author Nicolas Grekas <p@tchwork.com>
16+
*/
17+
interface BatchHandlerInterface
18+
{
19+
/**
20+
* @param Acknowledger|null $ack The function to call to ack/nack the $message.
21+
* The message should be handled synchronously when null.
22+
*
23+
* @return mixed The number of pending messages in the batch if $ack is not null,
24+
* the result from handling the message otherwise
25+
*/
26+
//public function __invoke(object $message, Acknowledger $ack = null): mixed;
27+
28+
/**
29+
* Flushes any pending buffers.
30+
*
31+
* @param bool $force Whether flushing is required; it can be skipped if not
32+
*/
33+
public function flush(bool $force): void;
34+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
use Symfony\Component\Messenger\Exception\LogicException;
15+
16+
/**
17+
* @author Nicolas Grekas <p@tchwork.com>
18+
*/
19+
trait BatchHandlerTrait
20+
{
21+
private $jobs = [];
22+
23+
/**
24+
* {@inheritdoc}
25+
*/
26+
public function flush(bool $force): void
27+
{
28+
if ($jobs = $this->jobs) {
29+
$this->jobs = [];
30+
$this->process($jobs);
31+
}
32+
}
33+
34+
/**
35+
* @param Acknowledger|null $ack The function to call to ack/nack the $message.
36+
* The message should be handled synchronously when null.
37+
*
38+
* @return mixed The number of pending messages in the batch if $ack is not null,
39+
* the result from handling the message otherwise
40+
*/
41+
private function handle(object $message, ?Acknowledger $ack)
42+
{
43+
if (null === $ack) {
44+
$ack = new Acknowledger(get_debug_type($this));
45+
$this->jobs[] = [$message, $ack];
46+
$this->flush(true);
47+
48+
return $ack->getResult();
49+
}
50+
51+
$this->jobs[] = [$message, $ack];
52+
if (!$this->shouldFlush()) {
53+
return \count($this->jobs);
54+
}
55+
56+
$this->flush(true);
57+
58+
return 0;
59+
}
60+
61+
private function shouldFlush(): bool
62+
{
63+
return 10 <= \count($this->jobs);
64+
}
65+
66+
/**
67+
* Completes the jobs in the list.
68+
*
69+
* @list<array{0: object, 1: Acknowledger}> $jobs A list of pairs of messages and their corresponding acknowledgers
70+
*/
71+
private function process(array $jobs): void
72+
{
73+
throw new LogicException(sprintf('"%s" should implement abstract method "process()".', get_debug_type($this)));
74+
}
75+
}

src/Symfony/Component/Messenger/Handler/HandlerDescriptor.php

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,34 @@
1919
final class HandlerDescriptor
2020
{
2121
private $handler;
22+
private $name;
23+
private $batchHandler;
2224
private $options;
2325

2426
public function __construct(callable $handler, array $options = [])
2527
{
28+
if (!$handler instanceof \Closure) {
29+
$handler = \Closure::fromCallable($handler);
30+
}
31+
2632
$this->handler = $handler;
2733
$this->options = $options;
34+
35+
$r = new \ReflectionFunction($handler);
36+
37+
if (str_contains($r->name, '{closure}')) {
38+
$this->name = 'Closure';
39+
} elseif (!$handler = $r->getClosureThis()) {
40+
$class = $r->getClosureScopeClass();
41+
42+
$this->name = ($class ? $class->name.'::' : '').$r->name;
43+
} else {
44+
if ($handler instanceof BatchHandlerInterface) {
45+
$this->batchHandler = $handler;
46+
}
47+
48+
$this->name = \get_class($handler).'::'.$r->name;
49+
}
2850
}
2951

3052
public function getHandler(): callable
@@ -34,7 +56,7 @@ public function getHandler(): callable
3456

3557
public function getName(): string
3658
{
37-
$name = $this->callableName($this->handler);
59+
$name = $this->name;
3860
$alias = $this->options['alias'] ?? null;
3961

4062
if (null !== $alias) {
@@ -44,37 +66,13 @@ public function getName(): string
4466
return $name;
4567
}
4668

47-
public function getOption(string $option)
69+
public function getBatchHandler(): ?BatchHandlerInterface
4870
{
49-
return $this->options[$option] ?? null;
71+
return $this->batchHandler;
5072
}
5173

52-
private function callableName(callable $handler): string
74+
public function getOption(string $option)
5375
{
54-
if (\is_array($handler)) {
55-
if (\is_object($handler[0])) {
56-
return \get_class($handler[0]).'::'.$handler[1];
57-
}
58-
59-
return $handler[0].'::'.$handler[1];
60-
}
61-
62-
if (\is_string($handler)) {
63-
return $handler;
64-
}
65-
66-
if ($handler instanceof \Closure) {
67-
$r = new \ReflectionFunction($handler);
68-
if (str_contains($r->name, '{closure}')) {
69-
return 'Closure';
70-
}
71-
if ($class = $r->getClosureScopeClass()) {
72-
return $class->name.'::'.$r->name;
73-
}
74-
75-
return $r->name;
76-
}
77-
78-
return \get_class($handler).'::__invoke';
76+
return $this->options[$option] ?? null;
7977
}
8078
}

src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,15 @@
1515
use Psr\Log\NullLogger;
1616
use Symfony\Component\Messenger\Envelope;
1717
use Symfony\Component\Messenger\Exception\HandlerFailedException;
18+
use Symfony\Component\Messenger\Exception\LogicException;
1819
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
20+
use Symfony\Component\Messenger\Handler\Acknowledger;
1921
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
2022
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
23+
use Symfony\Component\Messenger\Stamp\AckStamp;
24+
use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp;
2125
use Symfony\Component\Messenger\Stamp\HandledStamp;
26+
use Symfony\Component\Messenger\Stamp\NoAutoAckStamp;
2227

2328
/**
2429
* @author Samuel Roze <samuel.roze@gmail.com>
@@ -60,14 +65,58 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
6065

6166
try {
6267
$handler = $handlerDescriptor->getHandler();
63-
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message));
68+
$batchHandler = $handlerDescriptor->getBatchHandler();
69+
70+
/** @var AckStamp $ackStamp */
71+
if ($batchHandler && $ackStamp = $envelope->last(AckStamp::class)) {
72+
$ack = new Acknowledger(get_debug_type($batchHandler), static function (\Throwable $e = null, $result = null) use ($envelope, $ackStamp, $handlerDescriptor) {
73+
if (null !== $e) {
74+
$e = new HandlerFailedException($envelope, [$e]);
75+
} else {
76+
$envelope = $envelope->with(HandledStamp::fromDescriptor($handlerDescriptor, $result));
77+
}
78+
79+
$ackStamp->ack($envelope, $e);
80+
});
81+
82+
$result = $handler($message, $ack);
83+
84+
if (!\is_int($result) || 0 > $result) {
85+
throw new LogicException(sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".', \is_int($result) ? $result : get_debug_type($result), get_debug_type($batchHandler)));
86+
}
87+
88+
if (!$ack->isAcknowledged()) {
89+
$envelope = $envelope->with(new NoAutoAckStamp($handlerDescriptor));
90+
} elseif ($ack->getError()) {
91+
throw $ack->getError();
92+
} else {
93+
$result = $ack->getResult();
94+
}
95+
} else {
96+
$result = $handler($message);
97+
}
98+
99+
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $result);
64100
$envelope = $envelope->with($handledStamp);
65101
$this->logger->info('Message {class} handled by {handler}', $context + ['handler' => $handledStamp->getHandlerName()]);
66102
} catch (\Throwable $e) {
67103
$exceptions[] = $e;
68104
}
69105
}
70106

107+
/** @var FlushBatchHandlersStamp $flushStamp */
108+
if ($flushStamp = $envelope->last(FlushBatchHandlersStamp::class)) {
109+
/** @var NoAutoAckStamp $stamp */
110+
foreach ($envelope->all(NoAutoAckStamp::class) as $stamp) {
111+
try {
112+
$handler = $stamp->getHandlerDescriptor()->getBatchHandler();
113+
$handler->flush($flushStamp->force());
114+
} catch (\Throwable $e) {
115+
$exceptions[] = $e;
116+
}
117+
}
118+
}
119+
71120
if (null === $handler) {
72121
if (!$this->allowNoHandlers) {
73122
throw new NoHandlerForMessageException(sprintf('No handler for message "%s".', $context['class']));
@@ -85,11 +134,13 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
85134

86135
private function messageHasAlreadyBeenHandled(Envelope $envelope, HandlerDescriptor $handlerDescriptor): bool
87136
{
88-
$some = array_filter($envelope
89-
->all(HandledStamp::class), function (HandledStamp $stamp) use ($handlerDescriptor) {
90-
return $stamp->getHandlerName() === $handlerDescriptor->getName();
91-
});
137+
/** @var HandledStamp $stamp */
138+
foreach ($envelope->all(HandledStamp::class) as $stamp) {
139+
if ($stamp->getHandlerName() === $handlerDescriptor->getName()) {
140+
return true;
141+
}
142+
}
92143

93-
return \count($some) > 0;
144+
return false;
94145
}
95146
}

0 commit comments

Comments
 (0)
0