8000 [Messenger] allow processing messages in batches · symfony/symfony@9cf1bc7 · GitHub
[go: up one dir, main page]

Skip to content

Commit 9cf1bc7

Browse files
[Messenger] allow processing messages in batches
1 parent 8523047 commit 9cf1bc7

12 files changed

+570
-65
lines changed

src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php

+2-1
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,8 @@ public function testItReceivesSignals()
215215
with stamps: [
216216
"Symfony\\Component\\Messenger\\Bridge\\Amqp\\Transport\\AmqpReceivedStamp",
217217
"Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp",
218-
"Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp"
218+
"Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp",
219+
"Symfony\\Component\\Messenger\\Stamp\\AckStamp"
219220
]
220221
Done.
221222

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
use Symfony\Component\Messenger\Exception\LogicException;
15+
16+
/**
17+
* @author Nicolas Grekas <p@tchwork.com>
18+
*/
19+
class Acknowledger
20+
{
21+
private $handlerClass;
22+
private $ack;
23+
private $error = null;
24+
private $result = null;
25+
26+
public function __construct(string $handlerClass, \Closure $ack = null)
27+
{
28+
$this->handlerClass = $handlerClass;
29+
$this->ack = $ack ?? static function () {};
30+
}
31+
32+
/**
33+
* @param mixed $result
34+
*/
35+
public function ack($result = null): void
36+
{
37+
$this->doAck(null, $result);
38+
}
39+
40+
public function nack(\Throwable $error): void
41+
{
42+
$this->doAck($error);
43+
}
44+
45+
public function getError(): ?\Throwable
46+
{
47+
return $this->error;
48+
}
49+
50+
/**
51+
* @return mixed
52+
*/
53+
public function getResult()
54+
{
55+
return $this->result;
56+
}
57+
58+
public function isAcknowledged(): bool
59+
{
60+
return null === $this->ack;
61+
}
62+
63+
public function __destruct()
64+
{
65+
if ($this->ack instanceof \Closure) {
66+
$this->doAck(new LogicException(sprintf('The ack function was not called by the batch handler "%s".', $this->handlerClass)));
67+
}
68+
}
69+
70+
private function doAck(\Throwable $e = null, $result = null): void
71+
{
72+
if (!$ack = $this->ack) {
73+
throw new LogicException(sprintf('The ack function cannot be called twice by the batch handler "%s".', $this->handlerClass));
74+
}
75+
$this->ack = null;
76+
$this->error = $e;
77+
$this->result = $result;
78+
$ack($e, $result);
79+
}
80+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
/**
15+
* @author Nicolas Grekas <p@tchwork.com>
16+
*/
17+
interface BatchHandlerInterface
18+
{
19+
/*
20+
* @param Acknowledger|null $ack The function to call to ack/nack the $message.
21+
* The message should be handled synchronously when null.
22+
*
23+
* @return mixed The number of pending messages in the batch if $ack is not null,
24+
* the result from handling the message otherwise
25+
*/
26+
//public function __invoke(object $message, Acknowledger $ack = null): mixed;
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
use Symfony\Component\Messenger\Exception\LogicException;
15+
16+
/**
17+
* @author Nicolas Grekas <p@tchwork.com>
18+
*/
19+
trait BatchHandlerTrait
20+
{
21+
private $jobs = [];
22+
23+
/**
24+
* @param Acknowledger|null $ack The function to call to ack/nack the $message.
25+
* The message should be handled synchronously when null.
26+
*
27+
* @return int The number of pending messages in the batch if $ack is not null,
28+
* the result from handling the message otherwise
29+
*/
30+
private function handle(object $message, ?Acknowledger $ack)
31+
{
32+
$result = null !== $ack ? 0 : null;
33+
$this->jobs[] = [$this->schedule($message), $ack ?? $ack = new Acknowledger(get_debug_type($this))];
34+
35+
if (null !== $result && !$this->shouldProcess()) {
36+
return \count($this->jobs);
37+
}
38+
39+
$jobs = $this->jobs;
40+
$this->jobs = [];
41+
$this->process($jobs);
42+
43+
return $result ?? $ack->getResult();
44+
}
45+
46+
private function shouldProcess(): bool
47+
{
48+
return 10 <= \count($this->jobs);
49+
}
50+
51+
/**
52+
* Schedules a message for processing.
53+
*
54+
* @return mixed A value to pass to process() for batch handling
55+
*/
56+
private function schedule(object $message)
57+
{
58+
return $message;
59+
}
60+
61+
/**
62+
* Completes the jobs in the list.
63+
*
64+
* @list<array{0: mixed, 1: Acknowledger}> $jobs A list of pairs of values as returned by schedule()
65+
* and their corresponding acknowledgers
66+
*/
67+
private function process(array $jobs): void
68+
{
69+
throw new LogicException(sprintf('"%s" should implement abstract method "process()".', get_debug_type($this)));
70+
}
71+
}

src/Symfony/Component/Messenger/Handler/HandlerDescriptor.php

+26-19
+
return $this->name = \get_class($handler).'::'.$r->name;
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
final class HandlerDescriptor
2020
{
2121
private $handler;
22+
private $name;
23+
private $batchHandler;
2224
private $options;
2325

2426
public function __construct(callable $handler, array $options = [])
@@ -34,7 +36,7 @@ public function getHandler(): callable
3436

3537
public function getName(): string
3638
{
37-
$name = $this->callableName($this->handler);
39+
$name = $this->name ?? $this->callableName($this->handler);
3840
$alias = $this->options['alias'] ?? null;
3941

4042
if (null !== $alias) {
@@ -44,37 +46,42 @@ public function getName(): string
4446
return $name;
4547
}
4648

49+
public function getBatchHandler(): ?BatchHandlerInterface
50+
{
51+
if (null === $this->name) {
52+
$this->callableName($this->handler);
53+
}
54+
55+
return $this->batchHandler;
56+
}
57+
4758
public function getOption(string $option)
4859
{
4960
return $this->options[$option] ?? null;
5061
}
5162

5263
private function callableName(callable $handler): string
5364
{
54-
if (\is_array($handler)) {
55-
if (\is_object($handler[0])) {
56-
return \get_class($handler[0]).'::'.$handler[1];
57-
}
58-
59-
return $handler[0].'::'.$handler[1];
65+
if (!$handler instanceof \Closure) {
66+
$handler = \Closure::fromCallable($handler);
6067
}
6168

62-
if (\is_string($handler)) {
63-
return $handler;
69+
$r = new \ReflectionFunction($handler);
70+
71+
if (str_contains($r->name, '{closure}')) {
72+
return $this->name = 'Closure';
6473
}
6574

66-
if ($handler instanceof \Closure) {
67-
$r = new \ReflectionFunction($handler);
68-
if (str_contains($r->name, '{closure}')) {
69-
return 'Closure';
70-
}
71-
if ($class = $r->getClosureScopeClass()) {
72-
return $class->name.'::'.$r->name;
73-
}
75+
if (!$handler = $r->getClosureThis()) {
76+
$class = $r->getClosureScopeClass();
77+
78+
return $this->name = ($class ? $class->name.'::' : '').$r->name;
79+
}
7480

75-
return $r->name;
81+
if ($handler instanceof BatchHandlerInterface) {
82+
$this->batchHandler = $handler;
7683
}
7784

78-
return \get_class($handler).'::__invoke';
85
7986
}
8087
}

src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php

+38-1
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,14 @@
1515
use Psr\Log\NullLogger;
1616
use Symfony\Component\Messenger\Envelope;
1717
use Symfony\Component\Messenger\Exception\HandlerFailedException;
18+
use Symfony\Component\Messenger\Exception\LogicException;
1819
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
20+
use Symfony\Component\Messenger\Handler\Acknowledger;
1921
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
2022
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
23+
use Symfony\Component\Messenger\Stamp\AckStamp;
2124
use Symfony\Component\Messenger\Stamp\HandledStamp;
25+
use Symfony\Component\Messenger\Stamp\NoAutoAckStamp;
2226

2327
/**
2428
* @author Samuel Roze <samuel.roze@gmail.com>
@@ -60,7 +64,40 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
6064

6165
try {
6266
$handler = $handlerDescriptor->getHandler();
63-
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message));
67+
$batchHandler = $handlerDescriptor->getBatchHandler();
68+
$ack = null;
69+
70+
if ($batchHandler && $ackStamp = $envelope->last(AckStamp::class)) {
71+
$ack = new Acknowledger(get_debug_type($batchHandler), static function (\Throwable $e = null, $result = null) use ($envelope, $ackStamp, $handlerDescriptor) {
72+
if (null !== $e) {
73+
$e = new HandlerFailedException($envelope, [$e]);
74+
} else {
75+
$envelope = $envelope->with(HandledStamp::fromDescriptor($handlerDescriptor, $result));
76+
}
77+
78+
$ackStamp->ack($envelope, $e);
79+
});
80+
}
81+
82+
if (null === $ack) {
83+
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message));
84+
} else {
85+
$batchSize = $handler($message, $ack);
86+
87+
if (!\is_int($batchSize) || 0 > $batchSize) {
88+
throw new LogicException(sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".', \is_int($batchSize) ? $batchSize : get_debug_type($batchSize), get_debug_type($batchHandler)));
89+
}
90+
91+
if (!$ack->isAcknowledged()) {
92+
$envelope = $envelope->with(new NoAutoAckStamp());
93+
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $batchSize);
94+
} elseif ($ack->getError()) {
95+
throw $ack->getError();
96+
} else {
97+
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $ack->getResult());
98+
}
99+
}
100+
64101
$envelope = $envelope->with($handledStamp);
65102
$this->logger->info('Message {class} handled by {handler}', $context + ['handler' => $handledStamp->getHandlerName()]);
66103
} catch (\Throwable $e) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Stamp;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
16+
/**
17+
* Marker stamp for messages that can be ack/nack'ed.
18+
*/
19+
final class AckStamp implements NonSendableStampInterface
20+
{
21+
private $ack;
22+
23+
public function __construct(\Closure $ack)
24+
{
25+
$this->ack = $ack;
26+
}
27+
28+
public function ack(Envelope $envelope, \Throwable $e = null): void
29+
{
30+
($this->ack)($envelope, $e);
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Stamp;
13+
14+
/**
15+
* A marker that ack'ing for this message should not be done automatically.
16+
*/
17+
final class NoAutoAckStamp implements NonSendableStampInterface
18+
{
19+
}

0 commit comments

Comments
 (0)
0