8000 Allows to register handlers on a specific transport (and get rid of t… · symfony/symfony@1de1cb2 · GitHub
[go: up one dir, main page]

Skip to content

Commit 1de1cb2

Browse files
committed
Allows to register handlers on a specific transport (and get rid of this handler alias)
1 parent f82f1c0 commit 1de1cb2

22 files changed

+418
-209
lines changed

src/Symfony/Component/Messenger/Command/DebugCommand.php

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,9 @@ protected function execute(InputInterface $input, OutputInterface $output)
8484
foreach ($handlersByMessage as $message => $handlers) {
8585
$tableRows[] = [sprintf('<fg=cyan>%s</fg=cyan>', $message)];
8686
foreach ($handlers as $handler) {
87-
$tableRows[] = [sprintf(' handled by <info>%s</>', $handler)];
87+
$tableRows[] = [
88+
sprintf(' handled by <info>%s</>', $handler[0]).$this->formatConditions($handler[1]),
89+
];
8890
}
8991
}
9092

@@ -97,4 +99,18 @@ protected function execute(InputInterface $input, OutputInterface $output)
9799
}
98100
}
99101
}
102+
103+
private function formatConditions(array $options): string
104+
{
105+
if (empty($options)) {
106+
return '';
107+
}
108+
109+
$optionsMapping = [];
110+
foreach ($options as $key => $value) {
111+
$optionsMapping[] = ' '.$key.'='.$value;
112+
}
113+
114+
return ' (when'.implode(', ', $optionsMapping).')';
115+
}
100116
}

src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use Symfony\Component\DependencyInjection\Definition;
2020
use Symfony\Component\DependencyInjection\Exception\RuntimeException;
2121
use Symfony\Component\DependencyInjection\Reference;
22+
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
2223
use Symfony\Component\Messenger\Handler\HandlersLocator;
2324
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
2425
use Symfony\Component\Messenger\TraceableMessageBus;
@@ -94,32 +95,33 @@ private function registerHandlers(ContainerBuilder $container, array $busIds)
9495
$message = null;
9596
$handlerBuses = (array) ($tag['bus'] ?? $busIds);
9697

97-
foreach ($handles as $message => $method) {
98+
foreach ($handles as $message => $options) {
9899
$buses = $handlerBuses;
100+
99101
if (\is_int($message)) {
100-
$message = $method;
101-
$method = '__invoke';
102+
if (\is_string($options)) {
103+
$message = $options;
104+
$options = [];
105+
} else {
106+
throw new RuntimeException(sprintf('The handler configuration needs to return an array of messages or an associated array of message and configuration. Found value of type "%s" at position "%d" for service "%s".', \gettype($options), $message, $serviceId));
107+
}
102108
}
103109

104-
if (\is_array($message)) {
105-
list($message, $priority) = $message;
106-
} else {
107-
$priority = $tag['priority'] ?? 0;
110+
if (\is_string($options)) {
111+
$options = ['method' => $options];
108112
}
109113

110-
if (\is_array($method)) {
111-
if (isset($method['bus'])) {
112-
if (!\in_array($method['bus'], $busIds)) {
113-
$messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : $r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method);
114+
$priority = $tag['priority'] ?? $options['priority'] ?? 0;
115+
$method = $options['method'] ?? '__invoke';
114116

115-
throw new RuntimeException(sprintf('Invalid configuration %s for message "%s": bus "%s" does not exist.', $messageLocation, $message, $method['bus']));
116-
}
117+
if (isset($options['bus'])) {
118+
if (!\in_array($options['bus'], $busIds)) {
119+
$messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : $r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method);
117120

118-
$buses = [$method['bus']];
121+
throw new RuntimeException(sprintf('Invalid configuration %s for message "%s": bus "%s" does not exist.', $messageLocation, $message, $options['bus']));
119122
}
120123

121-
$priority = $method['priority'] ?? $priority;
122-
$method = $method['method'] ?? '__invoke';
124+
$buses = [$options['bus']];
123125
}
124126

125127
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
@@ -141,7 +143,7 @@ private function registerHandlers(ContainerBuilder $container, array $busIds)
141143
}
142144

143145
foreach ($buses as $handlerBus) {
144-
$handlersByBusAndMessage[$handlerBus][$message][$priority][] = $definitionId;
146+
$handlersByBusAndMessage[$handlerBus][$message][$priority][] = [$definitionId, $options];
145147
}
146148
}
147149

@@ -154,15 +156,20 @@ private function registerHandlers(ContainerBuilder $container, array $busIds)
154156
foreach ($handlersByBusAndMessage as $bus => $handlersByMessage) {
155157
foreach ($handlersByMessage as $message => $handlersByPriority) {
156158
krsort($handlersByPriority);
157-
$handlersByBusAndMessage[$bus][$message] = array_unique(array_merge(...$handlersByPriority));
159+
$handlersByBusAndMessage[$bus][$message] = array_merge(...$handlersByPriority);
158160
}
159161
}
160162

161163
$handlersLocatorMappingByBus = [];
162164
foreach ($handlersByBusAndMessage as $bus => $handlersByMessage) {
163-
foreach ($handlersByMessage as $message => $handlerIds) {
164-
$handlers = array_map(function (string $handlerId) { return new Reference($handlerId); }, $handlerIds);
165-
$handlersLocatorMappingByBus[$bus][$message] = new IteratorArgument($handlers);
165+
foreach ($handlersByMessage as $message => $handlers) {
166+
$handlerDescriptors = [];
167+
foreach ($handlers as $handler) {
168+
$definitions[$definitionId = '.messenger.handler_descriptor.'.ContainerBuilder::hash($bus.':'.$message.':'.$handler[0])] = (new Definition(HandlerDescriptor::class))->setArguments([new Reference($handler[0]), $handler[1]])->setFactory([HandlerDescriptor::class, 'create']);
169+
$handlerDescriptors[] = new Reference($definitionId);
170+
}
171+
172+
$handlersLocatorMappingByBus[$bus][$message] = new IteratorArgument($handlerDescriptors);
166173
}
167174
}
168175
$container->addDefinitions($definitions);

src/Symfony/Component/Messenger/HandleTrait.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ private function handle($message)
5252

5353
if (\count($handledStamps) > 1) {
5454
$handlers = implode(', ', array_map(function (HandledStamp $stamp): string {
55-
return sprintf('"%s"', $stamp->getHandlerAlias() ?? $stamp->getCallableName());
55+
return sprintf('"%s"', $stamp->getHandlerName());
5656
}, $handledStamps));
5757

5858
throw new LogicException(sprintf('Message of type "%s" was handled multiple times. Only one handler is expected when using "%s::%s()", got %d: %s.', \get_class($envelope->getMessage()), \get_class($this), __FUNCTION__, \count($handledStamps), $handlers));
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
/**
15+
* @author Samuel Rozeé
16+
*
17+
* @final
18+
*/
19+
class HandlerDescriptor
20+
{
21+
private $handler;
22+
private $options;
23+
24+
public function __construct(callable $handler, array $options = [])
25+
{
26+
$this->handler = $handler;
27+
$this->options = $options;
28+
}
29+
30+
public function getHandler(): callable
31+
{
32+
return $this->handler;
33+
}
34+
35+
public function getName(): string
36+
{
37+
$name = $this->callableName($this->handler);
38+
$alias = $this->options['alias'] ?? null;
39+
40+
if (null !== $alias) {
41+
$name .= '@'.$alias;
42+
}
43+
44+
return $name;
45+
}
46+
47+
private function callableName(callable $handler)
48+
{
49+
if (\is_array($handler)) {
50+
if (\is_object($handler[0])) {
51+
return \get_class($handler[0]).'::'.$handler[1];
52+
}
53+
54+
return $handler[0].'::'.$handler[1];
55+
}
56+
57+
if (\is_string($handler)) {
58+
return $handler;
59+
}
60+
61+
if ($handler instanceof \Closure) {
62+
$r = new \ReflectionFunction($handler);
63+
if (false !== strpos($r->name, '{closure}')) {
64+
return 'Closure';
65+
}
66+
if ($class = $r->getClosureScopeClass()) {
67+
return $class->name.'::'.$r->name;
68+
}
69+
70+
return $r->name;
71+
}
72+
73+
return \get_class($handler).'::__invoke';
74+
}
75+
76+
public function getOption(string $option)
77+
{
78+
return $this->options[$option] ?? null;
79+
}
80+
}

src/Symfony/Component/Messenger/Handler/HandlersLocator.php

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@
1212
namespace Symfony\Component\Messenger\Handler;
1313

1414
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
1516

1617
/**
1718
* Maps a message to a list of handlers.
1819
*
1920
* @author Nicolas Grekas <p@tchwork.com>
21+
* @author Samuel Roze <samuel.roze@gmail.com>
2022
*
2123
* @experimental in 4.2
2224
*/
@@ -25,7 +27,7 @@ class HandlersLocator implements HandlersLocatorInterface
2527
private $handlers;
2628

2729
/**
28-
* @param callable[][] $handlers
30+
* @param HandlerDescriptor[][]|callable[][] $handlers
2931
*/
3032
public function __construct(array $handlers)
3133
{
@@ -40,10 +42,23 @@ public function getHandlers(Envelope $envelope): iterable
4042
$seen = [];
4143

4244
foreach (self::listTypes($envelope) as $type) {
43-
foreach ($this->handlers[$type] ?? [] as $alias => $handler) {
44-
if (!\in_array($handler, $seen, true)) {
45-
yield $alias => $seen[] = $handler;
45+
foreach ($this->handlers[$type] ?? [] as $handlerDescriptor) {
46+
if (\is_callable($handlerDescriptor)) {
47+
$handlerDescriptor = new HandlerDescriptor($handlerDescriptor);
4648
}
49+
50+
if (!$this->shouldHandle($envelope, $handlerDescriptor)) {
51+
continue;
52+
}
53+
54+
$name = $handlerDescriptor->getName();
55+
if (\in_array($name, $seen)) {
56+
continue;
57+
}
58+
59+
$seen[] = $name;
60+
61+
yield $handlerDescriptor;
4762
}
4863
}
4964
}
@@ -60,4 +75,17 @@ public static function listTypes(Envelope $envelope): array
6075
+ class_implements($class)
6176
+ ['*' => '*'];
6277
}
78+
79+
private function shouldHandle(Envelope $envelope, HandlerDescriptor $handlerDescriptor)
80+
{
81+
if (null === $received = $envelope->last(ReceivedStamp::class)) {
82+
return true;
83+
}
84+
85+
if (null === $expectedTransport = $handlerDescriptor->getOption('transport')) {
86+
return true;
87+
}
88+
89+
return $received->getTransportName() === $expectedTransport;
90+
}
6391
}

src/Symfony/Component/Messenger/Handler/HandlersLocatorInterface.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ interface HandlersLocatorInterface
2525
/**
2626
* Returns the handlers for the given message name.
2727
*
28-
* @return iterable|callable[] Indexed by handler alias if available
28+
* @return iterable|HandlerDescriptor[] Indexed by handler alias if available
2929
*/
3030
public function getHandlers(Envelope $envelope): iterable;
3131
}

src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use Symfony\Component\Messenger\Envelope;
1717
use Symfony\Component\Messenger\Exception\HandlerFailedException;
1818
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
19+
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
1920
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
2021
use Symfony\Component\Messenger\Stamp\HandledStamp;
2122

@@ -54,17 +55,16 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
5455
];
5556

5657
$exceptions = [];
57-
foreach ($this->handlersLocator->getHandlers($envelope) as $alias => $handler) {
58-
$alias = \is_string($alias) ? $alias : null;
59-
60-
if ($this->messageHasAlreadyBeenHandled($envelope, $handler, $alias)) {
58+
foreach ($this->handlersLocator->getHandlers($envelope) as $handlerDescriptor) {
59+
if ($this->messageHasAlreadyBeenHandled($envelope, $handlerDescriptor)) {
6160
continue;
6261
}
6362

6463
try {
65-
$handledStamp = HandledStamp::fromCallable($handler, $handler($message), $alias);
64+
$handler = $handlerDescriptor->getHandler();
65+
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message));
6666
$envelope = $envelope->with($handledStamp);
67-
$this->logger->info('Message "{class}" handled by "{handler}"', $context + ['handler' => $handledStamp->getCallableName()]);
67+
$this->logger->info('Message "{class}" handled by "{handler}"', $context + ['handler' => $handledStamp->getHandlerName()]);
6868
} catch (\Throwable $e) {
6969
$exceptions[] = $e;
7070
}
@@ -85,12 +85,11 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
8585
return $stack->next()->handle($envelope, $stack);
8686
}
8787

88-
private function messageHasAlreadyBeenHandled(Envelope $envelope, callable $handler, ?string $alias): bool
88+
private function messageHasAlreadyBeenHandled(Envelope $envelope, HandlerDescriptor $handlerDescriptor): bool
8989
{
9090
$some = array_filter($envelope
91-
->all(HandledStamp::class), function (HandledStamp $stamp) use ($handler, $alias) {
92-
return $stamp->getCallableName() === HandledStamp::getNameFromCallable($handler) &&
93-
$stamp->getHandlerAlias() === $alias;
91+
->all(HandledStamp::class), function (HandledStamp $stamp) use ($handlerDescriptor) {
92+
return $stamp->getHandlerName() === $handlerDescriptor->getName();
9493
});
9594

9695
return \count($some) > 0;

0 commit comments

Comments
 (0)
0