8000 Create the Message component · symfony/symfony@2b39e52 · GitHub
[go: up one dir, main page]

Skip to content

Commit 2b39e52

Browse files
committed
Create the Message component
1 parent 424cbcc commit 2b39e52

30 files changed

+1314
-1
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Bundle\FrameworkBundle\Command;
13+
14+
use Symfony\Component\Console\Command\Command;
15+
use Symfony\Component\Console\Input\InputArgument;
16+
use Symfony\Component\Console\Input\InputInterface;
17+
use Symfony\Component\Console\Input\InputOption;
18+
use Symfony\Component\Console\Output\OutputInterface;
19+
use Symfony\Component\DependencyInjection\ContainerInterface;
20+
use Symfony\Component\Message\Asynchronous\ConsumedMessage;
21+
use Symfony\Component\Message\MessageBusInterface;
22+
use Symfony\Component\Message\MessageConsumerInterface;
23+
24+
/**
25+
* @author Samuel Roze <samuel.roze@gmail.com>
26+
*/
27+
class MessageConsumeCommand extends Command
28+
{
29+
protected static $defaultName = 'message:consume';
30+
31+
/**
32+
* {@inheritdoc}
33+
*/
34+
protected function configure()
35+
{
36+
$this
37+
->setDefinition(array(
38+
new InputArgument('consumer', InputArgument::REQUIRED, 'Name of the consumer'),
39+
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to dispatch the messages to', 'message_bus'),
40+
))
41+
->setDescription('Consume a message')
42+
->setHelp(<<<'EOF'
43+
The <info>%command.name%</info> command consume a message and dispatch it to the message bus.
44+
45+
%command.full_name% <consumer-service-name>
46+
47+
EOF
48+
)
49+
;
50+
}
51+
52+
/**
53+
* {@inheritdoc}
54+
*/
55+
protected function execute(InputInterface $input, OutputInterface $output)
56+
{
57+
/** @var ContainerInterface $container */
58+
$container = $this->getApplication()->getKernel()->getContainer();
59+
60+
if (!$container->has($consumerName = $input->getArgument('consumer'))) {
61+
throw new \RuntimeException(sprintf('Consumer "%s" do not exists', $consumerName));
62+
} elseif (!($consumer = $container->get($consumerName)) instanceof MessageConsumerInterface) {
63+
throw new \RuntimeException(sprintf('Consumer "%s" is not a valid message consumer. It should implement the interface "%s"', $consumerName, MessageConsumerInterface::class));
64+
}
65+
66+
if (!$container->has($busName = $input->getOption('bus'))) {
67+
throw new \RuntimeException(sprintf('Bus "%s" do not exists', $busName));
68+
} elseif (!($messageBus = $container->get($busName)) instanceof MessageBusInterface) {
69+
throw new \RuntimeException(sprintf('Bus "%s" is not a valid message bus. It should implement the interface "%s"', $busName, MessageBusInterface::class));
70+
}
71+
72+
foreach ($consumer->consume() as $message) {
73+
if (!$message instanceof ConsumedMessage) {
74+
$message = new ConsumedMessage($message);
75+
}
76+
77+
$messageBus->handle($message);
78+
}
79+
}
80+
}

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
use Symfony\Component\Lock\Lock;
2323
use Symfony\Component\Lock\Store\SemaphoreStore;
2424
use Symfony\Component\Security\Csrf\CsrfTokenManagerInterface;
25+
use Symfony\Component\Message\MessageBusInterface;
2526
use Symfony\Component\Serializer\Serializer;
2627
use Symfony\Component\Translation\Translator;
2728
use Symfony\Component\Validator\Validation;
@@ -101,6 +102,7 @@ public function getConfigTreeBuilder()
101102
$this->addPhpErrorsSection($rootNode);
102103
$this->addWebLinkSection($rootNode);
103104
$this->addLockSection($rootNode);
105+
$this->addMessageSection($rootNode);
104106

105107
return $treeBuilder;
106108
}
@@ -873,4 +875,35 @@ private function addWebLinkSection(ArrayNodeDefinition $rootNode)
873875
->end()
874876
;
875877
}
878+
879+
private function addMessageSection(ArrayNodeDefinition $rootNode)
880+
{
881+
$rootNode
882+
->children()
883+
->arrayNode('message')
884+
->info('Message configuration')
885+
->{!class_exists(FullStack::class) && class_exists(MessageBusInterface::class) ? 'canBeDisabled' : 'canBeEnabled'}()
886+
->children()
887+
->arrayNode('routing')
888+
->useAttributeAsKey('message_class')
889+
->prototype('array')
890+
->beforeNormalization()
891+
->ifString()
892+
->then(function ($v) {
893+
return array('producers' => array($v));
894+
})
895+
->end()
896+
->children()
897+
->arrayNode('producers')
898+
->requiresAtLeastOneElement()
899+
->prototype('scalar')->end()
900+
->end()
901+
->end()< F438 /div>
902+
->end()
903+
->end()
904+
->end()
905+
->end()
906+
->end()
907+
;
908+
}
876909
}

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,10 @@ public function load(array $configs, ContainerBuilder $container)
247247
$this->registerLockConfiguration($config['lock'], $container, $loader);
248248
}
249249

250+
if ($this->isConfigEnabled($container, $config['message'])) {
251+
$this->registerMessageConfiguration($config['message'], $container, $loader);
252+
}
253+
250254
if ($this->isConfigEnabled($container, $config['web_link'])) {
251255
if (!class_exists(HttpHeaderSerializer::class)) {
252256
throw new LogicException('WebLink support cannot be enabled as the WebLink component is not installed.');
@@ -1327,6 +1331,20 @@ private function registerLockConfiguration(array $config, ContainerBuilder $cont
13271331
}
13281332
}
13291333

1334+
private function registerMessageConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader)
1335+
{
1336+
$loader->load('message.xml');
1337+
1338+
$messageToProducerMapping = array();
1339+
foreach ($config['routing'] as $message => $messageConfiguration) {
1340+
$messageToProducerMapping[$message] = array_map(function (string $serviceName) {
1341+
return new Reference($serviceName);
1342+
}, $messageConfiguration['producers']);
1343+
}
1344+
1345+
$container->getDefinition('message.asynchronous.routing.producer_for_message_resolver')->setArgument(0, $messageToProducerMapping);
1346+
}
1347+
13301348
private function registerCacheConfiguration(array $config, ContainerBuilder $container)
13311349
{
13321350
$version = substr(str_replace('/', '-', base64_encode(hash('sha256', uniqid(mt_rand(), true), true))), 0, 22);

src/Symfony/Bundle/FrameworkBundle/FrameworkBundle.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
use Symfony\Component\HttpKernel\DependencyInjection\RegisterControllerArgumentLocatorsPass;
3333
use Symfony\Component\HttpKernel\DependencyInjection\RemoveEmptyControllerArgumentLocatorsPass;
3434
use Symfony\Component\HttpKernel\DependencyInjection\ResettableServicePass;
35+
use Symfony\Component\Message\DependencyInjection\MessagePass;
3536
use Symfony\Component\PropertyInfo\DependencyInjection\PropertyInfoPass;
3637
use Symfony\Component\Routing\DependencyInjection\RoutingResolverPass;
3738
use Symfony\Component\Serializer\DependencyInjection\SerializerPass;
@@ -114,6 +115,7 @@ public function build(ContainerBuilder $container)
114115
$this->addCompilerPassIfExists($container, FormPass::class);
115116
$container->addCompilerPass(new WorkflowGuardListenerPass());
116117
$container->addCompilerPass(new ResettableServicePass());
118+
$this->addCompilerPassIfExists($container, MessagePass::class);
117119

118120
if ($container->getParameter('kernel.debug')) {
119121
$container->addCompilerPass(new AddDebugLogProcessorPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, -32);

src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,11 @@
6464
<tag name="console.command" command="debug:event-dispatcher" />
6565
</service>
6666

67-
<service id="console.command.router_debug" class="Symfony\Bundle\FrameworkBundle\Command\RouterDebugCommand">
67+
<service id="Symfony\Bundle\FrameworkBundle\Command\MessageConsumeCommand">
68+
<tag name="console.command" command="message:consume" />
69+
</service>
70+
71+
<service id="Symfony\Bundle\FrameworkBundle\Command\RouterDebugCommand">
6872
<argument type="service" id="router" />
6973
<tag name="console.command" command="debug:router" />
7074
</service>
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?xml version="1.0" ?>
2+
3+
<container xmlns="http://symfony.com/schema/dic/services"
4+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
5+
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd">
6+
7+
<services>
8+
<defaults public="false" />
9+
10+
<!-- Bus -->
11+
<service id="message_bus" class="Symfony\Component\Message\MessageBus" public="true">
12+
<argument type="collection" /> <!-- Middlewares -->
13+
</service>
14+
15+
<service id="message.middleware.debug.logging" class="Symfony\Component\Message\Debug\LoggingMiddleware">
16+
<argument type="service" id="logger" />
17+
18+
<tag name="message_middleware" priority="10" />
19+
</service>
20+
21+
<!-- Handlers -->
22+
<service id="message.handler_resolver" class="Symfony\Component\Message\MessageHandlerResolver">
23+
<argument type="collection" /> <!-- Message to handler mapping -->
24+
</service>
25+
26+
<service id="message.middleware.call_message_handler" class="Symfony\Component\Message\Middleware\CallMessageHandlerMiddleware">
27+
<argument type="service" id="message.handler_resolver" />
28+
29+
<tag name="message_middleware" priority="-10" />
30+
</service>
31+
32+
<!-- Asynchronous -->
33+
<service id="message.asynchronous.routing.producer_for_message_resolver" class="Symfony\Component\Message\Asynchronous\Routing\ProducerForMessageResolver">
34+
<argument type="collection" /> <!-- Message to producer mapping -->
35+
</service>
36+
<service id="message.asynchronous.middleware.send_message_to_producer" class="Symfony\Component\Message\Asynchronous\Middleware\SendMessageToProducersMiddleware">
37+
<argument type="service" id="message.asynchronous.routing.producer_for_message_resolver" />
38+
39+
<tag name="message_middleware" priority="-5" />
40+
</service>
41+
42+
<!-- Message encoding/decoding -->
43+
<service id="message.transport.serialize_message_with_type_in_headers" class="Symfony\Component\Message\Transport\SerializeMessageWithTypeInHeaders">
44+
<argument type="service" id="serializer" />
45+
</service>
46+
47+
<service id="message.transport.default_encoder" alias="message.transport.serialize_message_with_type_in_headers" public="true" />
48+
<service id="message.transport.default_decoder" alias="message.transport.serialize_message_with_type_in_headers" public="true" />
49+
</services>
50+
</container>
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Message\Asynchronous;
13+
14+
/**
15+
* Wraps a consumed message. This is mainly used by the `SendMessageToProducersMiddleware` middleware to identify
16+
* a message should not be re-produced if it was just consumed.
17+
*
18+
* @author Samuel Roze <samuel.roze@gmail.com>
19+
*/
20+
final class ConsumedMessage
21+
{
22+
private $message;
23+
24+
public function __construct($message)
25+
{
26+
$this->message = $message;
27+
}
28+
29+
public function getMessage()
30+
{
31+
return $this->message;
32+
}
33+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Message\Asynchronous\Consumer;
13+
14+
use Symfony\Component\Message\Asynchronous\ConsumedMessage;
15+
use Symfony\Component\Message\MessageConsumerInterface;
16+
17+
/**
18+
* @author Samuel Roze <samuel.roze@gmail.com>
19+
*/
20+
class WrapIntoConsumedMessageConsumer implements MessageConsumerInterface
21+
{
22+
/**
23+
* @var MessageConsumerInterface
24+
*/
25+
private $decoratedConsumer;
26+
27+
public function __construct(MessageConsumerInterface $decoratedConsumer)
28+
{
29+
$this->decoratedConsumer = $decoratedConsumer;
30+
}
31+
32+
public function consume(): \Generator
33+
{
34+
foreach ($this->decoratedConsumer->consume() as $message) {
35+
yield new ConsumedMessage($message);
36+
}
37+
}
38+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Message\Asynchronous\Middleware;
13+
14+
use Symfony\Component\Message\Asynchronous\ConsumedMessage;
15+
use Symfony\Component\Message\Asynchronous\Routing\ProducerForMessageResolverInterface;
16+
use Symfony\Component\Message\MessageBusMiddlewareInterface;
17+
18+
/**
19+
* @author Samuel Roze <samuel.roze@gmail.com>
20+
*/
21+
class SendMessageToProducersMiddleware implements MessageBusMiddlewareInterface
22+
{
23+
/**
24+
* @var ProducerForMessageResolverInterface
25+
*/
26+
private $producerForMessageResolver;
27+
28+
public function __construct(ProducerForMessageResolverInterface $producerForMessageResolver)
29+
{
30+
$this->producerForMessageResolver = $producerForMessageResolver;
31+
}
32+
33+
/**
34+
* {@inheritdoc}
35+
*/
36+
public function handle($message, callable $next)
37+
{
38+
if ($message instanceof ConsumedMessage) {
39+
$message = $message->getMessage();
40+
} elseif (!empty($producers = $this->producerForMessageResolver->getProducersForMessage($message))) {
41+
foreach ($producers as $producer) {
42+
$producer->produce($message);
43+
}
44+
45+
if (!in_array(null, $producers)) {
46+
return;
47+
}
48+
}
49+
50+
return $next($message);
51+
}
52+
}

0 commit comments

Comments
 (0)
0