8000 [Messenger] allow processing messages in batches · symfony/symfony@456f94c · GitHub
[go: up one dir, main page]

Skip to content

Commit 456f94c

Browse files
[Messenger] allow processing messages in batches
1 parent 8523047 commit 456f94c

File tree

13 files changed

+716
-65
lines changed

13 files changed

+716
-65
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,8 @@ public function testItReceivesSignals()
215215
with stamps: [
216216
"Symfony\\Component\\Messenger\\Bridge\\Amqp\\Transport\\AmqpReceivedStamp",
217217
"Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp",
218-
"Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp"
218+
"Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp",
219+
"Symfony\\Component\\Messenger\\Stamp\\AckStamp"
219220
]
220221
Done.
221222

src/Symfony/Component/Messenger/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ CHANGELOG
44
5.4
55
---
66

7+
* Add support for handling messages in batches with `BatchHandlerInterface` and corresponding trait
78
* Add `StopWorkerExceptionInterface` and its implementation `StopWorkerException` to stop the worker.
89
* Add support for resetting container services after each messenger message.
910
* Added `WorkerMetadata` class which allows you to access the configuration details of a worker, like `queueNames` and `transportNames` it consumes from.
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
use Symfony\Component\Messenger\Exception\LogicException;
15+
16+
/**
17+
* @author Nicolas Grekas <p@tchwork.com>
18+
*/
19+
class Acknowledger
20+
{
21+
private $handlerClass;
22+
private $ack;
23+
private $error = null;
24+
private $result = null;
25+
26+
public function __construct(string $handlerClass, \Closure $ack = null)
27+
{
28+
$this->handlerClass = $handlerClass;
29+
$this->ack = $ack ?? static function () {};
30+
}
31+
32+
/**
33+
* @param mixed $result
34+
*/
35+
public function ack($result = null): void
36+
{
37+
$this->doAck(null, $result);
38+
}
39+
40+
public function nack(\Throwable $error): void
41+
{
42+
$this->doAck($error);
43+
}
44+
45+
public function getError(): ?\Throwable
46+
{
47+
return $this->error;
48+
}
49+
50+
/**
51+
* @return mixed
52+
*/
53+
public function getResult()
54+
{
55+
return $this->result;
56+
}
57+
58+
public function isAcknowledged(): bool
59+
{
60+
return null === $this->ack;
61+
}
62+
63+
public function __destruct()
64+
{
65+
if ($this->ack instanceof \Closure) {
66+
$this->doAck(new LogicException(sprintf('The ack function was not called by the batch handler "%s".', $this->handlerClass)));
67+
}
68+
}
69+
70+
private function doAck(\Throwable $e = null, $result = null): void
71+
{
72+
if (!$ack = $this->ack) {
73+
throw new LogicException(sprintf('The ack function cannot be called twice by the batch handler "%s".', $this->handlerClass));
74+
}
75+
$this->ack = null;
76+
$this->error = $e;
77+
$this->result = $result;
78+
$ack($e, $result);
79+
}
80+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
/**
15+
* @author Nicolas Grekas <p@tchwork.com>
16+
*/
17+
interface BatchHandlerInterface
18+
{
19+
/**
20+
* @param Acknowledger|null $ack The function to call to ack/nack the $message.
21+
* The message should be handled synchronously when null.
22+
*
23+
* @return mixed The number of pending messages in the batch if $ack is not null,
24+
* the result from handling the message otherwise
25+
*/
26+
//public function __invoke(object $message, Acknowledger $ack = null): mixed;
27+
28+
/**
29+
* Flushes any pending buffers.
30+
*/
31+
public function flush(): void;
32+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
use Symfony\Component\Messenger\Exception\LogicException;
15+
16+
/**
17+
* @author Nicolas Grekas <p@tchwork.com>
18+
*/
19+
trait BatchHandlerTrait
20+
{
21+
private $jobs = [];
22+
23+
/**
24+
* {@ 85EA inheritdoc}
25+
*/
26+
public function flush(): void
27+
{
28+
if ($jobs = $this->jobs) {
29+
$this->jobs = [];
30+
$this->process($jobs);
31+
}
32+
}
33+
34+
/**
35+
* @param Acknowledger|null $ack The function to call to ack/nack the $message.
36+
* The message should be handled synchronously when null.
37+
*
38+
* @return int The number of pending messages in the batch if $ack is not null,
39+
* the result from handling the message otherwise
40+
*/
41+
private function handle(object $message, ?Acknowledger $ack)
42+
{
43+
$result = null !== $ack ? 0 : null;
44+
$this->jobs[] = [$this->schedule($message), $ack ?? $ack = new Acknowledger(get_debug_type($this))];
45+
46+
if (null !== $result && !$this->shouldFlush()) {
47+
return \count($this->jobs);
48+
}
49+
$this->flush();
50+
51+
return $result ?? $ack->getResult();
52+
}
53+
54+
private function shouldFlush(): bool
55+
{
56+
return 10 <= \count($this->jobs);
57+
}
58+
59+
/**
60+
* Schedules a message for processing.
61+
*
62+
* @return mixed A value to pass to process() for batch handling
63+
*/
64+
private function schedule(object $message)
65+
{
66+
return $message;
67+
}
68+
69+
/**
70+
* Completes the jobs in the list.
71+
*
72+
* @list<array{0: mixed, 1: Acknowledger}> $jobs A list of pairs of values as returned by schedule()
73+
* and their corresponding acknowledgers
74+
*/
75+
private function process(array $jobs): void
76+
{
77+
throw new LogicException(sprintf('"%s" should implement abstract method "process()".', get_debug_type($this)));
78+
}
79+
}

src/Symfony/Component/Messenger/Handler/HandlerDescriptor.php

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
final class HandlerDescriptor
2020
{
2121
private $handler;
22+
private $name;
23+
private $batchHandler;
2224
private $options;
2325

2426
public function __construct(callable $handler, array $options = [])
@@ -34,7 +36,7 @@ public function getHandler(): callable
3436

3537
public function getName(): string
3638
{
37-
$name = $this->callableName($this->handler);
39+
$name = $this->name ?? $this->callableName($this->handler);
3840
$alias = $this->options['alias'] ?? null;
3941

4042
if (null !== $alias) {
@@ -44,37 +46,42 @@ public function getName(): string
4446
return $name;
4547
}
4648

49+
public function getBatchHandler(): ?BatchHandlerInterface
50+
{
51+
if (null === $this->name) {
52+
$this->callableName($this->handler);
53+
}
54+
55+
return $this->batchHandler;
56+
}
57+
4758
public function getOption(string $option)
4859
{
4960
return $this->options[$option] ?? null;
5061
}
5162

5263
private function callableName(callable $handler): string
5364
{
54-
if (\is_array($handler)) {
55-
if (\is_object($handler[0])) {
56-
return \get_class($handler[0]).'::'.$handler[1];
57-
}
58-
59-
return $handler[0].'::'.$handler[1];
65+
if (!$handler instanceof \Closure) {
66+
$handler = \Closure::fromCallable($handler);
6067
}
6168

62-
if (\is_string($handler)) {
63-
return $handler;
69+
$r = new \ReflectionFunction($handler);
70+
71+
if (str_contains($r->name, '{closure}')) {
72+
return $this->name = 'Closure';
6473
}
6574

66-
if ($handler instanceof \Closure) {
67-
$r = new \ReflectionFunction($handler);
68-
if (str_contains($r->name, '{closure}')) {
69-
return 'Closure';
70-
}
71-
if ($class = $r->getClosureScopeClass()) {
72-
return $class->name.'::'.$r->name;
73-
}
75+
if (!$handler = $r->getClosureThis()) {
76+
$class = $r->getClosureScopeClass();
77+
78+
return $this->name = ($class ? $class->name.'::' : '').$r->name;
79+
}
7480

75-
return $r->name;
81+
if ($handler instanceof BatchHandlerInterface) {
82+
$this->batchHandler = $handler;
7683
}
7784

78-
return \get_class($handler).'::__invoke';
85+
return $this->name = \get_class($handler).'::'.$r->name;
7986
}
8087
}

src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,14 @@
1515
use Psr\Log\NullLogger;
1616
use Symfony\Component\Messenger\Envelope;
1717
use Symfony\Component\Messenger\Exception\HandlerFailedException;
18+
use Symfony\Component\Messenger\Exception\LogicException;
1819
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
20+
use Symfony\Component\Messenger\Handler\Acknowledger;
1921
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
2022
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
23+
use Symfony\Component\Messenger\Stamp\AckStamp;
2124
use Symfony\Component\Messenger\Stamp\HandledStamp;
25+
use Symfony\Component\Messenger\Stamp\NoAutoAckStamp;
2226

2327
/**
2428
* @author Samuel Roze <samuel.roze@gmail.com>
@@ -53,14 +57,57 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
5357
];
5458

5559
$exceptions = [];
60+
61+
foreach ($envelope->all(NoAutoAckStamp::class) as $stamp) {
62+
try {
63+
$handler = $stamp->getHandlerDescriptor()->getBatchHandler();
64+
$handler->flush();
65+
} catch (\Throwable $e) {
66+
$exceptions[] = $e;
67+
}
68+
}
69+
5670
foreach ($this->handlersLocator->getHandlers($envelope) as $handlerDescriptor) {
5771
if ($this->messageHasAlreadyBeenHandled($envelope, $handlerDescriptor)) {
5872
continue;
5973
}
6074

6175
try {
6276
$handler = $handlerDescriptor->getHandler();
63-
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message));
77+
$batchHandler = $handlerDescriptor->getBatchHandler();
78+
$ack = null;
79+
80+
if ($batchHandler && $ackStamp = $envelope->last(AckStamp::class)) {
81+
$ack = new Acknowledger(get_debug_type($batchHandler), static function (\Throwable $e = null, $result = null) use ($envelope, $ackStamp, $handlerDescriptor) {
82+
if (null !== $e) {
83+
$e = new HandlerFailedException($envelope, [$e]);
84+
} else {
85+
$envelope = $envelope->with(HandledStamp::fromDescriptor($handlerDescriptor, $result));
86+
}
87+
88+
$ackStamp->ack($envelope, $e);
89+
});
90+
}
91+
92+
if (null === $ack) {
93+
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message));
94+
} else {
95+
$batchSize = $handler($message, $ack);
96+
97+
if (!\is_int($batchSize) || 0 > $batchSize) {
98+
throw new LogicException(sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".', \is_int($batchSize) ? $batchSize : get_debug_type($batchSize), get_debug_type($batchHandler)));
99+
}
100+
101+
if (!$ack->isAcknowledged()) {
102+
$envelope = $envelope->with(new NoAutoAckStamp($handlerDescriptor));
103+
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $batchSize);
104+
} elseif ($ack->getError()) {
105+
throw $ack->getError();
106+
} else {
107+
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $ack->getResult());
108+
}
109+
}
110+
64111
$envelope = $envelope->with($handledStamp);
65112
$this->logger->info('Message {class} handled by {handler}', $context + ['handler' => $handledStamp->getHandlerName()]);
66113
} catch (\Throwable $e) {
@@ -85,11 +132,12 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
85132

86133
private function messageHasAlreadyBeenHandled(Envelope $envelope, HandlerDescriptor $handlerDescriptor): bool
87134
{
88-
$some = array_filter($envelope
89-
->all(HandledStamp::class), function (HandledStamp $stamp) use ($handlerDescriptor) {
90-
return $stamp->getHandlerName() === $handlerDescriptor->getName();
91-
});
135+
foreach ($envelope->all(HandledStamp::class) as $stamp) {
136+
if ($stamp->getHandlerName() === $handlerDescriptor->getName()) {
137+
return true;
138+
}
139+
}
92140

93-
return \count($some) > 0;
141+
return false;
94142
}
95143
}

0 commit comments

Comments
 (0)
0