diff --git a/composer.json b/composer.json index 3ed1464ae1aff..aa77cf0cc55ba 100644 --- a/composer.json +++ b/composer.json @@ -55,6 +55,7 @@ "symfony/intl": "self.version", "symfony/ldap": "self.version", "symfony/lock": "self.version", + "symfony/messenger": "self.version", "symfony/monolog-bridge": "self.version", "symfony/options-resolver": "self.version", "symfony/process": "self.version", diff --git a/src/Symfony/Bundle/FrameworkBundle/Command/MessengerConsumeMessagesCommand.php b/src/Symfony/Bundle/FrameworkBundle/Command/MessengerConsumeMessagesCommand.php new file mode 100644 index 0000000000000..73032c88aff90 --- /dev/null +++ b/src/Symfony/Bundle/FrameworkBundle/Command/MessengerConsumeMessagesCommand.php @@ -0,0 +1,87 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Bundle\FrameworkBundle\Command; + +use Psr\Container\ContainerInterface; +use Symfony\Component\Console\Command\Command; +use Symfony\Component\Console\Input\InputArgument; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; +use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Transport\Enhancers\MaximumCountReceiver; +use Symfony\Component\Messenger\Transport\ReceiverInterface; +use Symfony\Component\Messenger\Worker; + +/** + * @author Samuel Roze + */ +class MessengerConsumeMessagesCommand extends Command +{ + protected static $defaultName = 'messenger:consume-messages'; + + private $bus; + private $receiverLocator; + + public function __construct(MessageBusInterface $bus, ContainerInterface $receiverLocator) + { + parent::__construct(); + + $this->bus = $bus; + $this->receiverLocator = $receiverLocator; + } + + /** + * {@inheritdoc} + */ + protected function configure() + { + $this + ->setDefinition(array( + new InputArgument('receiver', InputArgument::REQUIRED, 'Name of the receiver'), + new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'), + )) + ->setDescription('Consumes messages') + ->setHelp(<<<'EOF' +The %command.name% command consumes messages and dispatches them to the message bus. + + php %command.full_name% + +Use the --limit option to limit the number of messages received: + + php %command.full_name% --limit=10 +EOF + ) + ; + } + + /** + * {@inheritdoc} + */ + protected function execute(InputInterface $input, OutputInterface $output) + { + if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) { + throw new \RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName)); + } + + if (!($receiver = $this->receiverLocator->get($receiverName)) instanceof ReceiverInterface) { + throw new \RuntimeException(sprintf('Receiver "%s" is not a valid message consumer. It must implement the "%s" interface.', $receiverName, ReceiverInterface::class)); + } + + if ($limit = $input->getOption('limit')) { + $receiver = new MaximumCountReceiver($receiver, $limit); + } + + $worker = new Worker($receiver, $this->bus); + $worker->run(); + } +} diff --git a/src/Symfony/Bundle/FrameworkBundle/Controller/AbstractController.php b/src/Symfony/Bundle/FrameworkBundle/Controller/AbstractController.php index 2f1b2a9352410..954961119eb66 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Controller/AbstractController.php +++ b/src/Symfony/Bundle/FrameworkBundle/Controller/AbstractController.php @@ -18,6 +18,7 @@ use Symfony\Component\HttpFoundation\RequestStack; use Symfony\Component\HttpFoundation\Session\SessionInterface; use Symfony\Component\HttpKernel\HttpKernelInterface; +use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Routing\RouterInterface; use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface; use Symfony\Component\Security\Core\Authorization\AuthorizationCheckerInterface; @@ -84,6 +85,7 @@ public static function getSubscribedServices() 'security.token_storage' => '?'.TokenStorageInterface::class, 'security.csrf.token_manager' => '?'.CsrfTokenManagerInterface::class, 'parameter_bag' => '?'.ContainerInterface::class, + 'message_bus' => '?'.MessageBusInterface::class, ); } } diff --git a/src/Symfony/Bundle/FrameworkBundle/Controller/ControllerTrait.php b/src/Symfony/Bundle/FrameworkBundle/Controller/ControllerTrait.php index 8b639dc54d812..8d0525dd89907 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Controller/ControllerTrait.php +++ b/src/Symfony/Bundle/FrameworkBundle/Controller/ControllerTrait.php @@ -382,4 +382,20 @@ protected function isCsrfTokenValid(string $id, ?string $token): bool return $this->container->get('security.csrf.token_manager')->isTokenValid(new CsrfToken($id, $token)); } + + /** + * Dispatches a message to the bus. + * + * @param object $message The message to dispatch + * + * @final + */ + protected function dispatchMessage($message) + { + if (!$this->container->has('message_bus')) { + throw new \LogicException('The message bus is not enabled in your application. Try running "composer require symfony/messenger".'); + } + + return $this->container->get('message_bus')->dispatch($message); + } } diff --git a/src/Symfony/Bundle/FrameworkBundle/DataCollector/MessengerDataCollector.php b/src/Symfony/Bundle/FrameworkBundle/DataCollector/MessengerDataCollector.php new file mode 100644 index 0000000000000..99a133d8b1179 --- /dev/null +++ b/src/Symfony/Bundle/FrameworkBundle/DataCollector/MessengerDataCollector.php @@ -0,0 +1,93 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Bundle\FrameworkBundle\DataCollector; + +use Symfony\Component\HttpFoundation\Request; +use Symfony\Component\HttpFoundation\Response; +use Symfony\Component\HttpKernel\DataCollector\DataCollector; +use Symfony\Component\Messenger\MiddlewareInterface; + +/** + * @author Samuel Roze + */ +class MessengerDataCollector extends DataCollector implements MiddlewareInterface +{ + /** + * {@inheritdoc} + */ + public function collect(Request $request, Response $response, \Exception $exception = null) + { + return $this->data; + } + + /** + * {@inheritdoc} + */ + public function getName() + { + return 'messages'; + } + + /** + * {@inheritdoc} + */ + public function reset() + { + $this->data = array(); + } + + /** + * {@inheritdoc} + */ + public function handle($message, callable $next) + { + $debugRepresentation = array( + 'message' => array( + 'type' => get_class($message), + ), + ); + + $exception = null; + try { + $result = $next($message); + + if (is_object($result)) { + $debugRepresentation['result'] = array( + 'type' => get_class($result), + ); + } else { + $debugRepresentation['result'] = array( + 'type' => gettype($result), + 'value' => $result, + ); + } + } catch (\Throwable $exception) { + $debugRepresentation['exception'] = array( + 'type' => get_class($exception), + 'message' => $exception->getMessage(), + ); + } + + $this->data[] = $debugRepresentation; + + if (null !== $exception) { + throw $exception; + } + + return $result; + } + + public function getMessages(): array + { + return $this->data; + } +} diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php index eb3f48350dd41..15df976355909 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php @@ -22,6 +22,7 @@ use Symfony\Component\Lock\Lock; use Symfony\Component\Lock\Store\SemaphoreStore; use Symfony\Component\Security\Csrf\CsrfTokenManagerInterface; +use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Serializer\Serializer; use Symfony\Component\Translation\Translator; use Symfony\Component\Validator\Validation; @@ -102,6 +103,7 @@ public function getConfigTreeBuilder() $this->addPhpErrorsSection($rootNode); $this->addWebLinkSection($rootNode); $this->addLockSection($rootNode); + $this->addMessengerSection($rootNode); return $treeBuilder; } @@ -956,4 +958,35 @@ private function addWebLinkSection(ArrayNodeDefinition $rootNode) ->end() ; } + + private function addMessengerSection(ArrayNodeDefinition $rootNode) + { + $rootNode + ->children() + ->arrayNode('messenger') + ->info('Messenger configuration') + ->{!class_exists(FullStack::class) && class_exists(MessageBusInterface::class) ? 'canBeDisabled' : 'canBeEnabled'}() + ->children() + ->arrayNode('routing') + ->useAttributeAsKey('message_class') + ->prototype('array') + ->beforeNormalization() + ->ifString() + ->then(function ($v) { + return array('senders' => array($v)); + }) + ->end() + ->children() + ->arrayNode('senders') + ->requiresAtLeastOneElement() + ->prototype('scalar')->end() + ->end() + ->end() + ->end() + ->end() + ->end() + ->end() + ->end() + ; + } } diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index b36b179775671..842a51744b9e8 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -59,6 +59,8 @@ use Symfony\Component\Lock\LockInterface; use Symfony\Component\Lock\Store\StoreFactory; use Symfony\Component\Lock\StoreInterface; +use Symfony\Component\Messenger\Transport\ReceiverInterface; +use Symfony\Component\Messenger\Transport\SenderInterface; use Symfony\Component\PropertyAccess\PropertyAccessor; use Symfony\Component\PropertyInfo\PropertyAccessExtractorInterface; use Symfony\Component\PropertyInfo\PropertyDescriptionExtractorInterface; @@ -266,6 +268,12 @@ public function load(array $configs, ContainerBuilder $container) $this->registerLockConfiguration($config['lock'], $container, $loader); } + if ($this->isConfigEnabled($container, $config['messenger'])) { + $this->registerMessengerConfiguration($config['messenger'], $container, $loader); + } else { + $container->removeDefinition('console.command.messenger_consume_messages'); + } + if ($this->isConfigEnabled($container, $config['web_link'])) { if (!class_exists(HttpHeaderSerializer::class)) { throw new LogicException('WebLink support cannot be enabled as the WebLink component is not installed.'); @@ -333,6 +341,10 @@ public function load(array $configs, ContainerBuilder $container) ->addTag('validator.constraint_validator'); $container->registerForAutoconfiguration(ObjectInitializerInterface::class) ->addTag('validator.initializer'); + $container->registerForAutoconfiguration(ReceiverInterface::class) + ->addTag('messenger.receiver'); + $container->registerForAutoconfiguration(SenderInterface::class) + ->addTag('messenger.sender'); if (!$container->getParameter('kernel.debug')) { // remove tagged iterator argument for resource checkers @@ -1410,6 +1422,26 @@ private function registerLockConfiguration(array $config, ContainerBuilder $cont } } + private function registerMessengerConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader) + { + $loader->load('messenger.xml'); + + $senderLocatorMapping = array(); + $messageToSenderIdsMapping = array(); + foreach ($config['routing'] as $message => $messageConfiguration) { + foreach ($messageConfiguration['senders'] as $sender) { + if (null !== $sender) { + $senderLocatorMapping[$sender] = new Reference($sender); + } + } + + $messageToSenderIdsMapping[$message] = $messageConfiguration['senders']; + } + + $container->getDefinition('messenger.sender_locator')->replaceArgument(0, $senderLocatorMapping); + $container->getDefinition('messenger.asynchronous.routing.sender_locator')->replaceArgument(1, $messageToSenderIdsMapping); + } + private function registerCacheConfiguration(array $config, ContainerBuilder $container) { $version = new Parameter('container.build_id'); diff --git a/src/Symfony/Bundle/FrameworkBundle/FrameworkBundle.php b/src/Symfony/Bundle/FrameworkBundle/FrameworkBundle.php index 9511c4bc61ce0..6b1de4f03f33e 100644 --- a/src/Symfony/Bundle/FrameworkBundle/FrameworkBundle.php +++ b/src/Symfony/Bundle/FrameworkBundle/FrameworkBundle.php @@ -34,6 +34,7 @@ use Symfony\Component\HttpKernel\DependencyInjection\RegisterControllerArgumentLocatorsPass; use Symfony\Component\HttpKernel\DependencyInjection\RemoveEmptyControllerArgumentLocatorsPass; use Symfony\Component\HttpKernel\DependencyInjection\ResettableServicePass; +use Symfony\Component\Messenger\DependencyInjection\MessengerPass; use Symfony\Component\PropertyInfo\DependencyInjection\PropertyInfoPass; use Symfony\Component\Routing\DependencyInjection\RoutingResolverPass; use Symfony\Component\Serializer\DependencyInjection\SerializerPass; @@ -118,6 +119,7 @@ public function build(ContainerBuilder $container) $container->addCompilerPass(new ResettableServicePass()); $container->addCompilerPass(new TestServiceContainerWeakRefPass(), PassConfig::TYPE_BEFORE_REMOVING, -32); $container->addCompilerPass(new TestServiceContainerRealRefPass(), PassConfig::TYPE_AFTER_REMOVING); + $this->addCompilerPassIfExists($container, MessengerPass::class); if ($container->getParameter('kernel.debug')) { $container->addCompilerPass(new AddDebugLogProcessorPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, -32); diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml index 5611fa3ccdba3..9c1ed0e163780 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml @@ -69,6 +69,13 @@ + + + + + + + diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml new file mode 100644 index 0000000000000..79e9a3980b5e5 --- /dev/null +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml @@ -0,0 +1,70 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/ConfigurationTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/ConfigurationTest.php index 84921d9737d60..9f9e13317d7db 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/ConfigurationTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/ConfigurationTest.php @@ -17,6 +17,7 @@ use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException; use Symfony\Component\Config\Definition\Processor; use Symfony\Component\Lock\Store\SemaphoreStore; +use Symfony\Component\Messenger\MessageBusInterface; class ConfigurationTest extends TestCase { @@ -249,6 +250,10 @@ class_exists(SemaphoreStore::class) && SemaphoreStore::isSupported() ? 'semaphor ), ), ), + 'messenger' => array( + 'enabled' => !class_exists(FullStack::class) && class_exists(MessageBusInterface::class), + 'routing' => array(), + ), ); } } diff --git a/src/Symfony/Bundle/WebProfilerBundle/Resources/views/Collector/messages.html.twig b/src/Symfony/Bundle/WebProfilerBundle/Resources/views/Collector/messages.html.twig new file mode 100644 index 0000000000000..f6be519f17372 --- /dev/null +++ b/src/Symfony/Bundle/WebProfilerBundle/Resources/views/Collector/messages.html.twig @@ -0,0 +1,60 @@ +{% extends '@WebProfiler/Profiler/layout.html.twig' %} + +{% import _self as helper %} + +{% block menu %} + + {{ include('@WebProfiler/Icon/messages.svg') }} + Messages + + {% if collector.messages|length > 0 %} + + {{ collector.messages|length }} + + {% endif %} + +{% endblock %} + +{% block panel %} +

Messages

+ + {% if collector.messages is empty %} +

No messages

+ {% else %} + + + + + + + + + {% for message in collector.messages %} + + + + + {% endfor %} + +
MessageResult
{{ message.message.type }} + {% if message.result.type is defined %} + {{ message.result.type }} + {% endif %} + + {% if message.exception.type is defined %} + {{ message.exception.type }} + {% endif %} +
+ {% endif %} +{% endblock %} + +{% block toolbar %} + {% set color_code = 'normal' %} + {% set message_count = 0 %} + {% set icon %} + {{ include('@WebProfiler/Icon/messages.svg') }} + {{ message_count }} + {% endset %} + + {{ include('@WebProfiler/Profiler/toolbar_item.html.twig', { link: 'messages', status: color_code }) }} +{% endblock %} diff --git a/src/Symfony/Bundle/WebProfilerBundle/Resources/views/Icon/messages.svg b/src/Symfony/Bundle/WebProfilerBundle/Resources/views/Icon/messages.svg new file mode 100644 index 0000000000000..2fd49b55fe6d5 --- /dev/null +++ b/src/Symfony/Bundle/WebProfilerBundle/Resources/views/Icon/messages.svg @@ -0,0 +1,10 @@ + + + + + + + + + + \ No newline at end of file diff --git a/src/Symfony/Component/Messenger/Asynchronous/Middleware/SendMessageMiddleware.php b/src/Symfony/Component/Messenger/Asynchronous/Middleware/SendMessageMiddleware.php new file mode 100644 index 0000000000000..469c5672b9672 --- /dev/null +++ b/src/Symfony/Component/Messenger/Asynchronous/Middleware/SendMessageMiddleware.php @@ -0,0 +1,55 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Asynchronous\Middleware; + +use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocatorInterface; +use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage; +use Symfony\Component\Messenger\MiddlewareInterface; + +/** + * @author Samuel Roze + */ +class SendMessageMiddleware implements MiddlewareInterface +{ + private $senderLocator; + + public function __construct(SenderLocatorInterface $senderLocator) + { + $this->senderLocator = $senderLocator; + } + + /** + * {@inheritdoc} + */ + public function handle($message, callable $next) + { + if ($message instanceof ReceivedMessage) { + return $next($message->getMessage()); + } + + if (!empty($senders = $this->senderLocator->getSendersForMessage($message))) { + foreach ($senders as $sender) { + if (null === $sender) { + continue; + } + + $sender->send($message); + } + + if (!in_array(null, $senders, true)) { + return; + } + } + + return $next($message); + } +} diff --git a/src/Symfony/Component/Messenger/Asynchronous/Routing/SenderLocator.php b/src/Symfony/Component/Messenger/Asynchronous/Routing/SenderLocator.php new file mode 100644 index 0000000000000..24a85332250c3 --- /dev/null +++ b/src/Symfony/Component/Messenger/Asynchronous/Routing/SenderLocator.php @@ -0,0 +1,44 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Asynchronous\Routing; + +use Psr\Container\ContainerInterface; + +/** + * @author Samuel Roze + */ +class SenderLocator implements SenderLocatorInterface +{ + private $senderServiceLocator; + private $messageToSenderIdsMapping; + + public function __construct(ContainerInterface $senderServiceLocator, array $messageToSenderIdsMapping) + { + $this->senderServiceLocator = $senderServiceLocator; + $this->messageToSenderIdsMapping = $messageToSenderIdsMapping; + } + + /** + * {@inheritdoc} + */ + public function getSendersForMessage($message): array + { + $senderIds = $this->messageToSenderIdsMapping[get_class($message)] ?? $this->messageToSenderIdsMapping['*'] ?? array(); + + $senders = array(); + foreach ($senderIds as $senderId) { + $senders[] = $this->senderServiceLocator->get($senderId); + } + + return $senders; + } +} diff --git a/src/Symfony/Component/Messenger/Asynchronous/Routing/SenderLocatorInterface.php b/src/Symfony/Component/Messenger/Asynchronous/Routing/SenderLocatorInterface.php new file mode 100644 index 0000000000000..d97508b31a2f1 --- /dev/null +++ b/src/Symfony/Component/Messenger/Asynchronous/Routing/SenderLocatorInterface.php @@ -0,0 +1,31 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Asynchronous\Routing; + +use Symfony\Component\Messenger\Transport\SenderInterface; + +/** + * @author Samuel Roze + * + * @experimental in 4.1 + */ +interface SenderLocatorInterface +{ + /** + * Gets the sender (if applicable) for the given message object. + * + * @param object $message + * + * @return SenderInterface[] + */ + public function getSendersForMessage($message): array; +} diff --git a/src/Symfony/Component/Messenger/Asynchronous/Transport/ReceivedMessage.php b/src/Symfony/Component/Messenger/Asynchronous/Transport/ReceivedMessage.php new file mode 100644 index 0000000000000..1b1298da63d32 --- /dev/null +++ b/src/Symfony/Component/Messenger/Asynchronous/Transport/ReceivedMessage.php @@ -0,0 +1,37 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Asynchronous\Transport; + +use Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware; + +/** + * Wraps a received message. This is mainly used by the `SendMessageMiddleware` middleware to identify + * a message should not be sent if it was just received. + * + * @see SendMessageMiddleware + * + * @author Samuel Roze + */ +final class ReceivedMessage +{ + private $message; + + public function __construct($message) + { + $this->message = $message; + } + + public function getMessage() + { + return $this->message; + } +} diff --git a/src/Symfony/Component/Messenger/Asynchronous/Transport/WrapIntoReceivedMessage.php b/src/Symfony/Component/Messenger/Asynchronous/Transport/WrapIntoReceivedMessage.php new file mode 100644 index 0000000000000..dd17a94b13bea --- /dev/null +++ b/src/Symfony/Component/Messenger/Asynchronous/Transport/WrapIntoReceivedMessage.php @@ -0,0 +1,44 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Asynchronous\Transport; + +use Symfony\Component\Messenger\Transport\ReceiverInterface; + +/** + * @author Samuel Roze + */ +class WrapIntoReceivedMessage implements ReceiverInterface +{ + private $decoratedReceiver; + + public function __construct(ReceiverInterface $decoratedConsumer) + { + $this->decoratedReceiver = $decoratedConsumer; + } + + public function receive(): iterable + { + $iterator = $this->decoratedReceiver->receive(); + + foreach ($iterator as $message) { + try { + yield new ReceivedMessage($message); + } catch (\Throwable $e) { + if (!$iterator instanceof \Generator) { + throw $e; + } + + $iterator->throw($e); + } + } + } +} diff --git a/src/Symfony/Component/Messenger/ContainerHandlerLocator.php b/src/Symfony/Component/Messenger/ContainerHandlerLocator.php new file mode 100644 index 0000000000000..4b80d8ca8a9b2 --- /dev/null +++ b/src/Symfony/Component/Messenger/ContainerHandlerLocator.php @@ -0,0 +1,41 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger; + +use Psr\Container\ContainerInterface; +use Symfony\Component\Messenger\Exception\NoHandlerForMessageException; + +/** + * @author Miha Vrhovnik + * @author Samuel Roze + */ +class ContainerHandlerLocator implements HandlerLocatorInterface +{ + private $container; + + public function __construct(ContainerInterface $container) + { + $this->container = $container; + } + + public function resolve($message): callable + { + $messageClass = get_class($message); + $handlerKey = 'handler.'.$messageClass; + + if (!$this->container->has($handlerKey)) { + throw new NoHandlerForMessageException(sprintf('No handler for message "%s".', $messageClass)); + } + + return $this->container->get($handlerKey); + } +} diff --git a/src/Symfony/Component/Messenger/Debug/LoggingMiddleware.php b/src/Symfony/Component/Messenger/Debug/LoggingMiddleware.php new file mode 100644 index 0000000000000..8aea72b26332e --- /dev/null +++ b/src/Symfony/Component/Messenger/Debug/LoggingMiddleware.php @@ -0,0 +1,58 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Debug; + +use Symfony\Component\Messenger\MiddlewareInterface; +use Psr\Log\LoggerInterface; + +/** + * @author Samuel Roze + */ +class LoggingMiddleware implements MiddlewareInterface +{ + private $logger; + + public function __construct(LoggerInterface $logger) + { + $this->logger = $logger; + } + + /** + * {@inheritdoc} + */ + public function handle($message, callable $next) + { + $this->logger->debug('Starting handling message {class}', array( + 'message' => $message, + 'class' => get_class($message), + )); + + try { + $result = $next($message); + } catch (\Throwable $e) { + $this->logger->warning('An exception occurred while handling message {class}', array( + 'message' => $message, + 'exception' => $e, + 'class' => get_class($message), + )); + + throw $e; + } + + $this->logger->debug('Finished handling message {class}', array( + 'message' => $message, + 'class' => get_class($message), + )); + + return $result; + } +} diff --git a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php new file mode 100644 index 0000000000000..a0dff8ac9ad83 --- /dev/null +++ b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php @@ -0,0 +1,139 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\DependencyInjection; + +use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface; +use Symfony\Component\DependencyInjection\Compiler\PriorityTaggedServiceTrait; +use Symfony\Component\DependencyInjection\Compiler\ServiceLocatorTagPass; +use Symfony\Component\DependencyInjection\ContainerBuilder; +use Symfony\Component\DependencyInjection\Definition; +use Symfony\Component\DependencyInjection\Exception\RuntimeException; +use Symfony\Component\DependencyInjection\Reference; +use Symfony\Component\Messenger\Handler\ChainHandler; + +/** + * @author Samuel Roze + */ +class MessengerPass implements CompilerPassInterface +{ + use PriorityTaggedServiceTrait; + + private $messageBusService; + private $messageHandlerResolverService; + private $handlerTag; + + public function __construct(string $messageBusService = 'message_bus', string $messageHandlerResolverService = 'messenger.handler_resolver', string $handlerTag = 'message_handler') + { + $this->messageBusService = $messageBusService; + $this->messageHandlerResolverService = $messageHandlerResolverService; + $this->handlerTag = $handlerTag; + } + + /** + * {@inheritdoc} + */ + public function process(ContainerBuilder $container) + { + if (!$container->hasDefinition($this->messageBusService)) { + return; + } + + if (!$container->getParameter('kernel.debug') || !$container->has('logger')) { + $container->removeDefinition('messenger.middleware.debug.logging'); + } + + $this->registerReceivers($container); + $this->registerHandlers($container); + } + + private function registerHandlers(ContainerBuilder $container) + { + $handlersByMessage = array(); + + foreach ($container->findTaggedServiceIds($this->handlerTag, true) as $serviceId => $tags) { + foreach ($tags as $tag) { + $handles = $tag['handles'] ?? $this->guessHandledClass($container, $serviceId); + + if (!class_exists($handles)) { + $messageClassLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : 'declared in `__invoke` function'; + + throw new RuntimeException(sprintf('The message class "%s" %s of service "%s" does not exist.', $messageClassLocation, $handles, $serviceId)); + } + + $priority = $tag['priority'] ?? 0; + $handlersByMessage[$handles][$priority][] = new Reference($serviceId); + } + } + + foreach ($handlersByMessage as $message => $handlers) { + krsort($handlersByMessage[$message]); + $handlersByMessage[$message] = call_user_func_array('array_merge', $handlersByMessage[$message]); + } + + $definitions = array(); + foreach ($handlersByMessage as $message => $handlers) { + if (1 === count($handlers)) { + $handlersByMessage[$message] = current($handlers); + } else { + $d = new Definition(ChainHandler::class, array($handlers)); + $d->setPrivate(true); + $serviceId = hash('sha1', $message); + $definitions[$serviceId] = $d; + $handlersByMessage[$message] = new Reference($serviceId); + } + } + $container->addDefinitions($definitions); + + $handlersLocatorMapping = array(); + foreach ($handlersByMessage as $message => $handler) { + $handlersLocatorMapping['handler.'.$message] = $handler; + } + + $handlerResolver = $container->getDefinition($this->messageHandlerResolverService); + $handlerResolver->replaceArgument(0, ServiceLocatorTagPass::register($container, $handlersLocatorMapping)); + } + + private function guessHandledClass(ContainerBuilder $container, string $serviceId): string + { + $reflection = $container->getReflectionClass($container->getDefinition($serviceId)->getClass()); + + try { + $method = $reflection->getMethod('__invoke'); + } catch (\ReflectionException $e) { + throw new RuntimeException(sprintf('Service "%s" should have an `__invoke` function.', $serviceId)); + } + + $parameters = $method->getParameters(); + if (1 !== count($parameters)) { + throw new RuntimeException(sprintf('`__invoke` function of service "%s" must have exactly one parameter.', $serviceId)); + } + + $parameter = $parameters[0]; + if (null === $parameter->getClass()) { + throw new RuntimeException(sprintf('The parameter of `__invoke` function of service "%s" must type hint the message class it handles.', $serviceId)); + } + + return $parameter->getClass()->getName(); + } + + private function registerReceivers(ContainerBuilder $container) + { + $receiverMapping = array(); + foreach ($container->findTaggedServiceIds('messenger.receiver') as $id => $tags) { + foreach ($tags as $tag) { + $receiverMapping[$tag['id'] ?? $id] = new Reference($id); + } + } + + $container->getDefinition('messenger.receiver_locator')->replaceArgument(0, $receiverMapping); + } +} diff --git a/src/Symfony/Component/Messenger/Exception/ExceptionInterface.php b/src/Symfony/Component/Messenger/Exception/ExceptionInterface.php new file mode 100644 index 0000000000000..56f665b9ee804 --- /dev/null +++ b/src/Symfony/Component/Messenger/Exception/ExceptionInterface.php @@ -0,0 +1,21 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Exception; + +/** + * Base Message component's exception. + * + * @author Samuel Roze + */ +interface ExceptionInterface +{ +} diff --git a/src/Symfony/Component/Messenger/Exception/NoHandlerForMessageException.php b/src/Symfony/Component/Messenger/Exception/NoHandlerForMessageException.php new file mode 100644 index 0000000000000..20f6fcd326b50 --- /dev/null +++ b/src/Symfony/Component/Messenger/Exception/NoHandlerForMessageException.php @@ -0,0 +1,19 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Exception; + +/** + * @author Samuel Roze + */ +class NoHandlerForMessageException extends \RuntimeException implements ExceptionInterface +{ +} diff --git a/src/Symfony/Component/Messenger/Handler/ChainHandler.php b/src/Symfony/Component/Messenger/Handler/ChainHandler.php new file mode 100644 index 0000000000000..f73f92838101a --- /dev/null +++ b/src/Symfony/Component/Messenger/Handler/ChainHandler.php @@ -0,0 +1,48 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Handler; + +/** + * Represents a collection of message handlers. + * + * @author Samuel Roze + */ +class ChainHandler +{ + /** + * @var callable[] + */ + private $handlers; + + /** + * @param callable[] $handlers + */ + public function __construct(array $handlers) + { + if (empty($handlers)) { + throw new \InvalidArgumentException('A collection of message handlers requires at least one handler.'); + } + + $this->handlers = $handlers; + } + + public function __invoke($message) + { + $results = array(); + + foreach ($this->handlers as $handler) { + $results[] = $handler($message); + } + + return $results; + } +} diff --git a/src/Symfony/Component/Messenger/HandlerLocator.php b/src/Symfony/Component/Messenger/HandlerLocator.php new file mode 100644 index 0000000000000..232c961169e8c --- /dev/null +++ b/src/Symfony/Component/Messenger/HandlerLocator.php @@ -0,0 +1,41 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger; + +use Symfony\Component\Messenger\Exception\NoHandlerForMessageException; + +/** + * @author Samuel Roze + */ +class HandlerLocator implements HandlerLocatorInterface +{ + /** + * Maps a message (its class) to a given handler. + */ + private $messageToHandlerMapping; + + public function __construct(array $messageToHandlerMapping = array()) + { + $this->messageToHandlerMapping = $messageToHandlerMapping; + } + + public function resolve($message): callable + { + $messageKey = get_class($message); + + if (!isset($this->messageToHandlerMapping[$messageKey])) { + throw new NoHandlerForMessageException(sprintf('No handler for message "%s".', $messageKey)); + } + + return $this->messageToHandlerMapping[$messageKey]; + } +} diff --git a/src/Symfony/Component/Messenger/HandlerLocatorInterface.php b/src/Symfony/Component/Messenger/HandlerLocatorInterface.php new file mode 100644 index 0000000000000..089acacc32323 --- /dev/null +++ b/src/Symfony/Component/Messenger/HandlerLocatorInterface.php @@ -0,0 +1,33 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger; + +use Symfony\Component\Messenger\Exception\NoHandlerForMessageException; + +/** + * @author Samuel Roze + * + * @experimental in 4.1 + */ +interface HandlerLocatorInterface +{ + /** + * Returns the handler for the given message. + * + * @param object $message + * + * @throws NoHandlerForMessageException + * + * @return callable + */ + public function resolve($message): callable; +} diff --git a/src/Symfony/Component/Messenger/MessageBus.php b/src/Symfony/Component/Messenger/MessageBus.php new file mode 100644 index 0000000000000..1843ba02af066 --- /dev/null +++ b/src/Symfony/Component/Messenger/MessageBus.php @@ -0,0 +1,59 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger; + +/** + * @author Samuel Roze + * @author Matthias Noback + */ +class MessageBus implements MessageBusInterface +{ + private $middlewares; + + /** + * @var MiddlewareInterface[]|null + */ + private $indexedMiddlewares; + + /** + * @param MiddlewareInterface[]|iterable $middlewares + */ + public function __construct(iterable $middlewares = array()) + { + $this->middlewares = $middlewares; + } + + /** + * {@inheritdoc} + */ + public function dispatch($message) + { + return call_user_func($this->callableForNextMiddleware(0), $message); + } + + private function callableForNextMiddleware(int $index): callable + { + if (null === $this->indexedMiddlewares) { + $this->indexedMiddlewares = is_array($this->middlewares) ? array_values($this->middlewares) : iterator_to_array($this->middlewares, false); + } + + if (!isset($this->indexedMiddlewares[$index])) { + return function () {}; + } + + $middleware = $this->indexedMiddlewares[$index]; + + return function ($message) use ($middleware, $index) { + return $middleware->handle($message, $this->callableForNextMiddleware($index + 1)); + }; + } +} diff --git a/src/Symfony/Component/Messenger/MessageBusInterface.php b/src/Symfony/Component/Messenger/MessageBusInterface.php new file mode 100644 index 0000000000000..1d441ea568ff7 --- /dev/null +++ b/src/Symfony/Component/Messenger/MessageBusInterface.php @@ -0,0 +1,31 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger; + +/** + * @author Samuel Roze + * + * @experimental in 4.1 + */ +interface MessageBusInterface +{ + /** + * Dispatches the given message. + * + * The bus can return a value coming from handlers, but is not required to do so. + * + * @param object $message + * + * @return mixed + */ + public function dispatch($message); +} diff --git a/src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php b/src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php new file mode 100644 index 0000000000000..428fd99231bba --- /dev/null +++ b/src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php @@ -0,0 +1,41 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Middleware; + +use Symfony\Component\Messenger\MiddlewareInterface; +use Symfony\Component\Messenger\HandlerLocatorInterface; + +/** + * @author Samuel Roze + */ +class HandleMessageMiddleware implements MiddlewareInterface +{ + private $messageHandlerResolver; + + public function __construct(HandlerLocatorInterface $messageHandlerResolver) + { + $this->messageHandlerResolver = $messageHandlerResolver; + } + + /** + * {@inheritdoc} + */ + public function handle($message, callable $next) + { + $handler = $this->messageHandlerResolver->resolve($message); + $result = $handler($message); + + $next($message); + + return $result; + } +} diff --git a/src/Symfony/Component/Messenger/MiddlewareInterface.php b/src/Symfony/Component/Messenger/MiddlewareInterface.php new file mode 100644 index 0000000000000..98ae70119c488 --- /dev/null +++ b/src/Symfony/Component/Messenger/MiddlewareInterface.php @@ -0,0 +1,27 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger; + +/** + * @author Samuel Roze + * + * @experimental in 4.1 + */ +interface MiddlewareInterface +{ + /** + * @param object $message + * + * @return mixed + */ + public function handle($message, callable $next); +} diff --git a/src/Symfony/Component/Messenger/README.md b/src/Symfony/Component/Messenger/README.md new file mode 100644 index 0000000000000..1b93b36ea0cec --- /dev/null +++ b/src/Symfony/Component/Messenger/README.md @@ -0,0 +1,16 @@ +Messenger Component +=================== + +**This Component is experimental**. [Experimental features](https://symfony.com/doc/current/contributing/code/experimental.html) +are not covered by Symfony's BC-break policy. + +The Messenger component helps application send and receive messages to/from other applications or via +message queues. + +Resources +--------- + + * [Contributing](https://symfony.com/doc/current/contributing/index.html) + * [Report issues](https://github.com/symfony/symfony/issues) and + [send Pull Requests](https://github.com/symfony/symfony/pulls) + in the [main Symfony repository](https://github.com/symfony/symfony) diff --git a/src/Symfony/Component/Messenger/Tests/Asynchronous/Middleware/SendMessageMiddlewareTest.php b/src/Symfony/Component/Messenger/Tests/Asynchronous/Middleware/SendMessageMiddlewareTest.php new file mode 100644 index 0000000000000..436a6df71a0ed --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Asynchronous/Middleware/SendMessageMiddlewareTest.php @@ -0,0 +1,100 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Asynchronous\Middleware; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware; +use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocatorInterface; +use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\SenderInterface; + +class SendMessageMiddlewareTest extends TestCase +{ + public function testItSendsTheMessageToAssignedSender() + { + $message = new DummyMessage('Hey'); + $sender = $this->createMock(SenderInterface::class); + $next = $this->createPartialMock(\stdClass::class, ['__invoke']); + + $middleware = new SendMessageMiddleware(new InMemorySenderLocator(array( + $sender, + ))); + + $sender->expects($this->once())->method('send')->with($message); + $next->expects($this->never())->method($this->anything()); + + $middleware->handle($message, $next); + } + + public function testItAlsoCallsTheNextMiddlewareIfASenderIsNull() + { + $message = new DummyMessage('Hey'); + $sender = $this->createMock(SenderInterface::class); + $next = $this->createPartialMock(\stdClass::class, ['__invoke']); + + $middleware = new SendMessageMiddleware(new InMemorySenderLocator(array( + $sender, + null, + ))); + + $sender->expects($this->once())->method('send')->with($message); + $next->expects($this->once())->method($this->anything()); + + $middleware->handle($message, $next); + } + + public function testItCallsTheNextMiddlewareWhenNoSenderForThisMessage() + { + $message = new DummyMessage('Hey'); + $next = $this->createPartialMock(\stdClass::class, ['__invoke']); + + $middleware = new SendMessageMiddleware(new InMemorySenderLocator(array())); + + $next->expects($this->once())->method($this->anything()); + + $middleware->handle($message, $next); + } + + public function testItSkipsReceivedMessages() + { + $innerMessage = new DummyMessage('Hey'); + $message = new ReceivedMessage($innerMessage); + + $sender = $this->createMock(SenderInterface::class); + $next = $this->createPartialMock(\stdClass::class, ['__invoke']); + + $middleware = new SendMessageMiddleware(new InMemorySenderLocator(array( + $sender, + ))); + + $sender->expects($this->never())->method('send'); + $next->expects($this->once())->method('__invoke')->with($innerMessage); + + $middleware->handle($message, $next); + } +} + +class InMemorySenderLocator implements SenderLocatorInterface +{ + private $senders; + + public function __construct(array $senders) + { + $this->senders = $senders; + } + + public function getSendersForMessage($message): array + { + return $this->senders; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Asynchronous/Routing/SenderLocatorTest.php b/src/Symfony/Component/Messenger/Tests/Asynchronous/Routing/SenderLocatorTest.php new file mode 100644 index 0000000000000..e92882da39abb --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Asynchronous/Routing/SenderLocatorTest.php @@ -0,0 +1,61 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Asynchronous\Routing; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\DependencyInjection\Container; +use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocator; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Tests\Fixtures\SecondMessage; +use Symfony\Component\Messenger\Transport\SenderInterface; + +class SenderLocatorTest extends TestCase +{ + public function testItReturnsTheSenderBasedOnTheMessageClass() + { + $sender = $this->createMock(SenderInterface::class); + $container = new Container(); + $container->set('my_amqp_sender', $sender); + + $locator = new SenderLocator($container, [ + DummyMessage::class => [ + 'my_amqp_sender', + ] + ]); + + $this->assertEquals([$sender], $locator->getSendersForMessage(new DummyMessage('Hello'))); + $this->assertEquals([], $locator->getSendersForMessage(new SecondMessage())); + } + + public function testItSupportsAWildcardInsteadOfTheMessageClass() + { + $container = new Container(); + + $sender = $this->createMock(SenderInterface::class); + $container->set('my_amqp_sender', $sender); + + $apiSender = $this->createMock(SenderInterface::class); + $container->set('my_api_sender', $apiSender); + + $locator = new SenderLocator($container, [ + DummyMessage::class => [ + 'my_amqp_sender', + ], + '*' => [ + 'my_api_sender' + ] + ]); + + $this->assertEquals([$sender], $locator->getSendersForMessage(new DummyMessage('Hello'))); + $this->assertEquals([$apiSender], $locator->getSendersForMessage(new SecondMessage())); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Fixtures/DummyMessage.php b/src/Symfony/Component/Messenger/Tests/Fixtures/DummyMessage.php new file mode 100644 index 0000000000000..fb02ca7b866ae --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Fixtures/DummyMessage.php @@ -0,0 +1,18 @@ +message = $message; + } + + public function getMessage(): string + { + return $this->message; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Fixtures/SecondMessage.php b/src/Symfony/Component/Messenger/Tests/Fixtures/SecondMessage.php new file mode 100644 index 0000000000000..98f331cb6dfa4 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Fixtures/SecondMessage.php @@ -0,0 +1,7 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\MessageBus; +use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\MiddlewareInterface; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; + +class MessageBusTest extends TestCase +{ + public function testItHasTheRightInterface() + { + $bus = new MessageBus(); + + $this->assertInstanceOf(MessageBusInterface::class, $bus); + } + + public function testItCallsTheMiddlewaresAndChainTheReturnValue() + { + $message = new DummyMessage('Hello'); + $responseFromDepthMiddleware = 1234; + + $firstMiddleware = $this->createMock(MiddlewareInterface::class); + $firstMiddleware->expects($this->once()) + ->method('handle') + ->with($message, $this->anything()) + ->will($this->returnCallback(function($message, $next) { + return $next($message); + })); + + $secondMiddleware = $this->createMock(MiddlewareInterface::class); + $secondMiddleware->expects($this->once()) + ->method('handle') + ->with($message, $this->anything()) + ->willReturn($responseFromDepthMiddleware); + + $bus = new MessageBus([ + $firstMiddleware, + $secondMiddleware, + ]); + + $this->assertEquals($responseFromDepthMiddleware, $bus->dispatch($message)); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Middleware/HandleMessageMiddlewareTest.php b/src/Symfony/Component/Messenger/Tests/Middleware/HandleMessageMiddlewareTest.php new file mode 100644 index 0000000000000..4098d8526b273 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Middleware/HandleMessageMiddlewareTest.php @@ -0,0 +1,39 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Middleware; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\HandlerLocator; +use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; + +class HandleMessageMiddlewareTest extends TestCase +{ + public function testItCallsTheHandlerAndNextMiddleware() + { + $message = new DummyMessage('Hey'); + + $handler = $this->createPartialMock(\stdClass::class, ['__invoke']); + $handler->method('__invoke')->willReturn('Hello'); + + $next = $this->createPartialMock(\stdClass::class, ['__invoke']); + + $middleware = new HandleMessageMiddleware(new HandlerLocator(array( + DummyMessage::class => $handler, + ))); + + $handler->expects($this->once())->method('__invoke')->with($message); + $next->expects($this->once())->method('__invoke')->with($message); + + $middleware->handle($message, $next); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/Serialization/SerializerTest.php b/src/Symfony/Component/Messenger/Tests/Transport/Serialization/SerializerTest.php new file mode 100644 index 0000000000000..8b6199144b7c7 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Transport/Serialization/SerializerTest.php @@ -0,0 +1,47 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Transport\Serialization; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\Serialization\Serializer; +use Symfony\Component\Serializer as SerializerComponent; +use Symfony\Component\Serializer\Encoder\JsonEncoder; +use Symfony\Component\Serializer\Normalizer\ObjectNormalizer; + +class SerializerTest extends TestCase +{ + public function testEncodedIsDecodable() + { + $serializer = new Serializer( + new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) + ); + + $message = new DummyMessage('Hello'); + + $this->assertEquals($message, $serializer->decode($serializer->encode($message))); + } + + public function testEncodedIsHavingTheBodyAndTypeHeader() + { + $serializer = new Serializer( + new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) + ); + + $encoded = $serializer->encode(new DummyMessage('Hello')); + + $this->assertArrayHasKey('body', $encoded); + $this->assertArrayHasKey('headers', $encoded); + $this->assertArrayHasKey('type', $encoded['headers']); + $this->assertEquals(DummyMessage::class, $encoded['headers']['type']); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php new file mode 100644 index 0000000000000..0b0411e412f8e --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -0,0 +1,90 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage; +use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\ReceiverInterface; +use Symfony\Component\Messenger\Worker; + +class WorkerTest extends TestCase +{ + public function testWorkerDispatchTheReceivedMessage() + { + $receiver = new CallbackReceiver(function() { + yield new DummyMessage('API'); + yield new DummyMessage('IPA'); + }); + + $bus = $this->createMock(MessageBusInterface::class); + + $bus->expects($this->at(0))->method('dispatch')->with(new ReceivedMessage(new DummyMessage('API'))); + $bus->expects($this->at(1))->method('dispatch')->with(new ReceivedMessage(new DummyMessage('IPA'))); + + $worker = new Worker($receiver, $bus); + $worker->run(); + } + + public function testWorkerDoesNotWrapMessagesAlreadyWrappedInReceivedMessages() + { + $receiver = new CallbackReceiver(function() { + yield new ReceivedMessage(new DummyMessage('API')); + }); + + $bus = $this->createMock(MessageBusInterface::class); + + $bus->expects($this->at(0))->method('dispatch')->with(new ReceivedMessage(new DummyMessage('API'))); + + $worker = new Worker($receiver, $bus); + $worker->run(); + } + + public function testWorkerIsThrowingExceptionsBackToGenerators() + { + $receiver = new CallbackReceiver(function() { + try { + yield new DummyMessage('Hello'); + + $this->assertTrue(false, 'This should not be called because the exception is sent back to the generator.'); + } catch (\InvalidArgumentException $e) { + // This should be called because of the exception sent back to the generator. + $this->assertTrue(true); + } + }); + + $bus = $this->createMock(MessageBusInterface::class); + + $bus->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not')); + + $worker = new Worker($receiver, $bus); + $worker->run(); + } +} + +class CallbackReceiver implements ReceiverInterface +{ + private $callable; + + public function __construct(callable $callable) + { + $this->callable = $callable; + } + + public function receive(): iterable + { + $callable = $this->callable; + + return $callable(); + } +} diff --git a/src/Symfony/Component/Messenger/Transport/Enhancers/MaximumCountReceiver.php b/src/Symfony/Component/Messenger/Transport/Enhancers/MaximumCountReceiver.php new file mode 100644 index 0000000000000..489d52cd5482a --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Enhancers/MaximumCountReceiver.php @@ -0,0 +1,51 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Enhancers; + +use Symfony\Component\Messenger\Transport\ReceiverInterface; + +/** + * @author Samuel Roze + */ +class MaximumCountReceiver implements ReceiverInterface +{ + private $decoratedReceiver; + private $maximumNumberOfMessages; + + public function __construct(ReceiverInterface $decoratedReceiver, int $maximumNumberOfMessages) + { + $this->decoratedReceiver = $decoratedReceiver; + $this->maximumNumberOfMessages = $maximumNumberOfMessages; + } + + public function receive(): iterable + { + $iterator = $this->decoratedReceiver->receive(); + $receivedMessages = 0; + + foreach ($iterator as $message) { + try { + yield $message; + } catch (\Throwable $e) { + if (!$iterator instanceof \Generator) { + throw $e; + } + + $iterator->throw($e); + } + + if (++$receivedMessages >= $this->maximumNumberOfMessages) { + break; + } + } + } +} diff --git a/src/Symfony/Component/Messenger/Transport/ReceiverInterface.php b/src/Symfony/Component/Messenger/Transport/ReceiverInterface.php new file mode 100644 index 0000000000000..d7fa8673eabda --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/ReceiverInterface.php @@ -0,0 +1,22 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport; + +/** + * @author Samuel Roze + * + * @experimental in 4.1 + */ +interface ReceiverInterface +{ + public function receive(): iterable; +} diff --git a/src/Symfony/Component/Messenger/Transport/SenderInterface.php b/src/Symfony/Component/Messenger/Transport/SenderInterface.php new file mode 100644 index 0000000000000..a142e1f00995e --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/SenderInterface.php @@ -0,0 +1,27 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport; + +/** + * @author Samuel Roze + * + * @experimental in 4.1 + */ +interface SenderInterface +{ + /** + * Sends the given message. + * + * @param object $message + */ + public function send($message); +} diff --git a/src/Symfony/Component/Messenger/Transport/Serialization/DecoderInterface.php b/src/Symfony/Component/Messenger/Transport/Serialization/DecoderInterface.php new file mode 100644 index 0000000000000..0099356aa3582 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Serialization/DecoderInterface.php @@ -0,0 +1,34 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Serialization; + +/** + * @author Samuel Roze + * + * @experimental in 4.1 + */ +interface DecoderInterface +{ + /** + * Decodes the message from an encoded-form. + * + * The `$encodedMessage` parameter is a key-value array that + * describes the message, that will be used by the different adapters. + * + * The most common keys are: + * - `body` (string) - the message body + * - `headers` (string) - a key/value pair of headers + * + * @return object + */ + public function decode(array $encodedMessage); +} diff --git a/src/Symfony/Component/Messenger/Transport/Serialization/EncoderInterface.php b/src/Symfony/Component/Messenger/Transport/Serialization/EncoderInterface.php new file mode 100644 index 0000000000000..cdc4a75e93826 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Serialization/EncoderInterface.php @@ -0,0 +1,32 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Serialization; + +/** + * @author Samuel Roze + * + * @experimental in 4.1 + */ +interface EncoderInterface +{ + /** + * Encodes a message to a common format understandable by adapters. The encoded array should only + * contain scalar and arrays. + * + * The most common keys of the encoded array are: + * - `body` (string) - the message body + * - `headers` (string) - a key/value pair of headers + * + * @param object $message The object that is put on the MessageBus by the user + */ + public function encode($message): array; +} diff --git a/src/Symfony/Component/Messenger/Transport/Serialization/Serializer.php b/src/Symfony/Component/Messenger/Transport/Serialization/Serializer.php new file mode 100644 index 0000000000000..76d747ad99552 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Serialization/Serializer.php @@ -0,0 +1,54 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Serialization; + +use Symfony\Component\Serializer\SerializerInterface; + +/** + * @author Samuel Roze + */ +class Serializer implements DecoderInterface, EncoderInterface +{ + private $serializer; + private $format; + + public function __construct(SerializerInterface $serializer, string $format = 'json') + { + $this->serializer = $serializer; + $this->format = $format; + } + + /** + * {@inheritdoc} + */ + public function decode(array $encodedMessage) + { + if (empty($encodedMessage['body']) || empty($encodedMessage['headers'])) { + throw new \InvalidArgumentException('Encoded message should have at least a `body` and some `headers`.'); + } elseif (empty($encodedMessage['headers']['type'])) { + throw new \InvalidArgumentException('Encoded message does not have a `type` header.'); + } + + return $this->serializer->deserialize($encodedMessage['body'], $encodedMessage['headers']['type'], $this->format); + } + + /** + * {@inheritdoc} + */ + public function encode($message): array + { + return array( + 'body' => $this->serializer->serialize($message, $this->format), + 'headers' => array('type' => get_class($message)), + ); + } +} diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php new file mode 100644 index 0000000000000..25c2897fe60c3 --- /dev/null +++ b/src/Symfony/Component/Messenger/Worker.php @@ -0,0 +1,56 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger; + +use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage; +use Symfony\Component\Messenger\Transport\ReceiverInterface; + +/** + * @author Samuel Roze + * + * @experimental in 4.1 + */ +class Worker +{ + private $receiver; + private $bus; + + public function __construct(ReceiverInterface $receiver, MessageBusInterface $bus) + { + $this->receiver = $receiver; + $this->bus = $bus; + } + + /** + * Receive the messages and dispatch them to the bus. + */ + public function run() + { + $iterator = $this->receiver->receive(); + + foreach ($iterator as $message) { + if (!$message instanceof ReceivedMessage) { + $message = new ReceivedMessage($message); + } + + try { + $this->bus->dispatch($message); + } catch (\Throwable $e) { + if (!$iterator instanceof \Generator) { + throw $e; + } + + $iterator->throw($e); + } + } + } +} diff --git a/src/Symfony/Component/Messenger/composer.json b/src/Symfony/Component/Messenger/composer.json new file mode 100644 index 0000000000000..7f7d982dc255f --- /dev/null +++ b/src/Symfony/Component/Messenger/composer.json @@ -0,0 +1,41 @@ +{ + "name": "symfony/messenger", + "type": "library", + "description": "Symfony Messenger Component", + "keywords": [], + "homepage": "https://symfony.com", + "license": "MIT", + "authors": [ + { + "name": "Samuel Roze", + "email": "samuel.roze@gmail.com" + }, + { + "name": "Symfony Community", + "homepage": "https://symfony.com/contributors" + } + ], + "require": { + "php": "^7.1.3" + }, + "require-dev": { + "symfony/serializer": "~3.4|~4.0", + "symfony/dependency-injection": "~3.4|~4.0", + "symfony/property-access": "~3.4|~4.0" + }, + "suggest": { + "sroze/enqueue-bridge": "For using the php-enqueue library as an adapter." + }, + "autoload": { + "psr-4": { "Symfony\\Component\\Messenger\\": "" }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "minimum-stability": "dev", + "extra": { + "branch-alias": { + "dev-master": "4.0-dev" + } + } +} diff --git a/src/Symfony/Component/Messenger/phpunit.xml.dist b/src/Symfony/Component/Messenger/phpunit.xml.dist new file mode 100644 index 0000000000000..3eff653caf38c --- /dev/null +++ b/src/Symfony/Component/Messenger/phpunit.xml.dist @@ -0,0 +1,30 @@ + + + + + + + + + + ./Tests/ + + + + + + ./ + + ./Tests + ./vendor + + + +