8000 [Messenger][Profiler] Trace middleware execution by ogizanagi · Pull Request #27321 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
'event_listener': '#00B8F5',
'template': '#66CC00',
'doctrine': '#FF6633',
'messenger.middleware': '#BDB81E',
} %}
{% endif %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use Symfony\Component\Messenger\Handler\ChainHandler;
use Symfony\Component\Messenger\Handler\Locator\ContainerHandlerLocator;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
use Symfony\Component\Messenger\Middleware\Enhancers\TraceableMiddleware;
use Symfony\Component\Messenger\TraceableMessageBus;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;
Expand All @@ -37,13 +38,15 @@ class MessengerPass implements CompilerPassInterface
private $busTag;
private $senderTag;
private $receiverTag;
private $debugStopwatchId;

public function __construct(string $handlerTag = 'messenger.message_handler', string $busTag = 'messenger.bus', string $senderTag = 'messenger.sender', string $receiverTag = 'messenger.receiver')
public function __construct(string $handlerTag = 'messenger.message_handler', string $busTag = 'messenger.bus', string $senderTag = 'messenger.sender', string $receiverTag = 'messenger.receiver', string $debugStopwatchId = 'debug.stopwatch')
{
$this->handlerTag = $handlerTag;
$this->busTag = $busTag;
$this->senderTag = $senderTag;
$this->receiverTag = $receiverTag;
$this->debugStopwatchId = $debugStopwatchId;
}

/**
Expand Down Expand Up @@ -303,6 +306,7 @@ private function registerBusToCollector(ContainerBuilder $container, string $bus

private function registerBusMiddleware(ContainerBuilder $container, string $busId, array $middlewareCollection)
{
$debug = $container->getParameter('kernel.debug') && $container->has($this->debugStopwatchId);
$middlewareReferences = array();
foreach ($middlewareCollection as $middlewareItem) {
$id = $middlewareItem['id'];
Expand All @@ -315,7 +319,7 @@ private function registerBusMiddleware(ContainerBuilder $container, string $busI
throw new RuntimeException(sprintf('Invalid middleware "%s": define such service to be able to use it.', $id));
}

if (($definition = $container->findDefinition($messengerMiddlewareId))->isAbstract()) {
if ($isDefinitionAbstract = ($definition = $container->findDefinition($messengerMiddlewareId))->isAbstract()) {
$childDefinition = new ChildDefinition($messengerMiddlewareId);
$count = \count($definition->getArguments());
foreach (array_values($arguments ?? array()) as $key => $argument) {
Expand All @@ -329,6 +333,21 @@ private function registerBusMiddleware(ContainerBuilder $container, string $busI
throw new RuntimeException(sprintf('Invalid middleware factory "%s": a middleware factory must be an abstract definition.', $id));
}

if ($debug) {
$container->register($debugMiddlewareId = '.messenger.debug.traced.'.$messengerMiddlewareId, TraceableMiddleware::class)
// Decorates with a high priority so it's applied the earliest:
->setDecoratedService($messengerMiddlewareId, null, 100)
->setArguments(array(
new Reference($debugMiddlewareId.'.inner'),
new Reference($this->debugStopwatchId),
// In case the definition isn't abstract,
// we cannot be sure the service instance is used by one bus only.
// So we only inject the bus name when the original definition is abstract.
$isDefinitionAbstract ? $busId : null,
))
;
}

$middlewareReferences[] = new Reference($messengerMiddlewareId);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Middleware\Enhancers;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\EnvelopeAwareInterface;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Stopwatch\Stopwatch;

/**
* Collects some data about a middleware.
*
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*/
class TraceableMiddleware implements MiddlewareInterface, EnvelopeAwareInterface
{
private $inner;
private $stopwatch;
private $busName;
private $eventCategory;

public function __construct(MiddlewareInterface $inner, Stopwatch $stopwatch, string $busName = null, string $eventCategory = 'messenger.middleware')
{
$this->inner = $inner;
$this->stopwatch = $stopwatch;
$this->busName = $busName;
$this->eventCategory = $eventCategory;
}

/**
* @param Envelope $envelope
*/
public function handle($envelope, callable $next)
{
$class = \get_class($this->inner);
$eventName = 'c' === $class[0] && 0 === strpos($class, "class@anonymous\0") ? get_parent_class($class).'@anonymous' : $class;

if ($this->busName) {
$eventName .= " (bus: {$this->busName})";
}

$this->stopwatch->start($eventName, $this->eventCategory);

try {
$result = $this->inner->handle($envelope->getMessageFor($this->inner), function ($message) use ($next, $eventName) {
$this->stopwatch->stop($eventName);
$result = $next($message);
$this->stopwatch->start($eventName, $this->eventCategory);

return $result;
});
} finally {
if ($this->stopwatch->isStarted($eventName)) {
$this->stopwatch->stop($eventName);
}
}

return $result;
}
}
5CC0
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Stopwatch\Stopwatch;

class MessengerPassTest extends TestCase
{
Expand Down Expand Up @@ -561,6 +562,39 @@ public function testMiddlewareFactoryDefinitionMustBeAbstract()
(new MessengerPass())->process($container);
}

public function testDecoratesWithTraceableMiddlewareOnDebug()
{
$container = $this->getContainerBuilder();

$container->register($busId = 'message_bus', MessageBusInterface::class)->setArgument(0, array())->addTag('messenger.bus');
$container->register('abstract_middleware', UselessMiddleware::class)->setAbstract(true);
$container->register('concrete_middleware', UselessMiddleware::class);

$container->setParameter($middlewareParameter = $busId.'.middleware', array(
array('id' => 'abstract_middleware'),
array('id' => 'concrete_middleware'),
));

$container->setParameter('kernel.debug', true);
$container->register('debug.stopwatch', Stopwatch::class);

(new MessengerPass())->process($container);

$this->assertNotNull($concreteDef = $container->getDefinition('.messenger.debug.traced.concrete_middleware'));
$this->assertEquals(array(
new Reference('.messenger.debug.traced.concrete_middleware.inner'),
new Reference('debug.stopwatch'),
null,
), $concreteDef->getArguments());

$this->assertNotNull($abstractDef = $container->getDefinition(".messenger.debug.traced.$busId.middleware.abstract_middleware"));
$this->assertEquals(array(
new Reference(".messenger.debug.traced.$busId.middleware.abstract_middleware.inner"),
new Reference('debug.stopwatch'),
$busId,
), $abstractDef->getArguments());
}

public function testItRegistersTheDebugCommand()
{
$container = $this->getContainerBuilder($commandBusId = 'command_bus');
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Middleware\Enhancers;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\Enhancers\TraceableMiddleware;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Stopwatch\Stopwatch;

/**
* @author Maxime Steinhausser <maxime.steinhausser@gmail.com>
*/
class TraceableMiddlewareTest extends TestCase
D7DC

This comment was marked as resolved.

{
public function testHandle()
{
$busId = 'command_bus';
$envelope = Envelope::wrap($message = new DummyMessage('Hello'));

$middleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
$middleware->expects($this->once())
->method('handle')
->with($message, $this->anything())
->will($this->returnCallback(function ($message, callable $next) {
return $next($message);
}))
;

$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$next
->expects($this->once())
->method('__invoke')
->with($message)
->willReturn($expectedReturnedValue = 'Hello')
;

$stopwatch = $this->createMock(Stopwatch::class);
$stopwatch->expects($this->once())->method('isStarted')->willReturn(true);
$stopwatch->expects($this->exactly(2))
->method('start')
->with($this->matches('%sMiddlewareInterface%s (bus: command_bus)'), 'messenger.middleware')
;
$stopwatch->expects($this->exactly(2))
->method('stop')
->with($this->matches('%sMiddlewareInterface%s (bus: command_bus)'))
;

$traced = new TraceableMiddleware($middleware, $stopwatch, $busId);

$this->assertSame($expectedReturnedValue, $traced->handle($envelope, $next));
}

/**
* @expectedException \RuntimeException
* @expectedExceptionMessage Foo exception from next callable
*/
public function testHandleWithException()
{
$busId = 'command_bus';
$envelope = Envelope::wrap($message = new DummyMessage('Hello'));

$middleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock();
$middleware->expects($this->once())
->method('handle')
->with($message, $this->anything())
->will($this->returnCallback(function ($message, callable $next) {
return $next($message);
}))
;

$next = $this->createPartialMock(\stdClass::class, array('__invoke'));
$next
->expects($this->once())
->method('__invoke')
->willThrowException(new \RuntimeException('Foo exception from next callable'))
;

$stopwatch = $this->createMock(Stopwatch::class);
$stopwatch->expects($this->once())->method('isStarted')->willReturn(true);
// Start is only expected to be called once, as an exception is thrown by the next callable:
$stopwatch->expects($this->exactly(1))
->method('start')
->with($this->matches('%sMiddlewareInterface%s (bus: command_bus)'), 'messenger.middleware')
;
$stopwatch->expects($this->exactly(2))
->method('stop')
->with($this->matches('%sMiddlewareInterface%s (bus: command_bus)'))
;

$traced = new TraceableMiddleware($middleware, $stopwatch, $busId);
$traced->handle($envelope, $next);
}
}
1 change: 1 addition & 0 deletions src/Symfony/Component/Messenger/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"symfony/process": "~3.4|~4.0",
"symfony/property-access": "~3.4|~4.0",
"symfony/serializer": "~3.4|~4.0",
"symfony/stopwatch": "~3.4|~4.0",
"symfony/validator": "~3.4|~4.0",
"symfony/var-dumper": "~3.4|~4.0"
},
Expand Down
0