8000 [Messenger] use events consistently in worker by Tobion · Pull Request #34217 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[Messenger] use events consistently in worker #34217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions UPGRADE-4.4.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,17 @@ Lock
Messenger
---------

* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
pass a `RoutableMessageBus` instance instead.
* [BC BREAK] Removed `SendersLocatorInterface::getSenderByAlias` added in 4.3.
* [BC BREAK] Removed `$retryStrategies` argument from `Worker::__construct`.
* [BC BREAK] Removed `$retryStrategyLocator` argument from `ConsumeMessagesCommand::__construct`.
* [BC BREAK] Changed arguments of `ConsumeMessagesCommand::__construct`.
* [BC BREAK] Removed `$senderClassOrAlias` argument from `RedeliveryStamp::__construct`.
* [BC BREAK] Removed `UnknownSenderException`.
* [BC BREAK] Removed `WorkerInterface`.
* [BC BREAK] Removed `$onHandledCallback` of `Worker::run(array $options = [], callable $onHandledCallback = null)`.
* [BC BREAK] Removed `StopWhenMemoryUsageIsExceededWorker` in favor of `StopWorkerOnMemoryLimitListener`.
* [BC BREAK] Removed `StopWhenMessageCountIsExceededWorker` in favor of `StopWorkerOnMessageLimitListener`.
* [BC BREAK] Removed `StopWhenTimeLimitIsReachedWorker` in favor of `StopWorkerOnTimeLimitListener`.
* [BC BREAK] Removed `StopWhenRestartSignalIsReceived` in favor of `StopWorkerOnRestartSignalListener`.
* Marked the `MessengerDataCollector` class as `@final`.

Mime
Expand Down
< 8000 tr data-hunk="d917099d9e94e8ffe45b322d772ec00de1d08fdee3ec9749d6e82b7bf0b222e3" class="show-top-border">
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,9 @@
<service id="console.command.messenger_consume_messages" class="Symfony\Component\Messenger\Command\ConsumeMessagesCommand">
<argument /> <!-- Routable message bus -->
<argument type="service" id="messenger.receiver_locator" />
<argument type="service" id="event_dispatcher" />
<argument type="service" id="logger" on-invalid="null" />
<argument type="collection" /> <!-- Receiver names -->
<argument type="service" id="event_dispatcher" />
<call method="setCachePoolForRestartSignal">
<argument type="service" id="cache.messenger.restart_workers_signal" />
</call>

<tag name="console.command" command="messenger:consume" />
<tag name="console.command" command="messenger:consume-messages" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
<argument /> <!-- max delay ms -->
</service>

<!-- worker event listeners -->
<service id="messenger.retry.send_failed_message_for_retry_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener">
<tag name="kernel.event_subscriber" />
<tag name="monolog.logger" channel="messenger" />
Expand All @@ -106,14 +107,28 @@
<argument type="service" id="logger" on-invalid="ignore" />
</service>

<!-- failed handling -->
<service id="messenger.failure.send_failed_message_to_failure_transport_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener">
<tag name="kernel.event_subscriber" />
<tag name="monolog.logger" channel="messenger" />
<argument /> <!-- Failure transport -->
<argument type="service" id="logger" on-invalid="ignore" />
</service>

<service id="messenger.listener.dispatch_pcntl_signal_listener" class="Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener">
<tag name="kernel.event_subscriber" />
</service>

<service id="messenger.listener.stop_worker_on_restart_signal_listener" class="Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener">
<tag name="kernel.event_subscriber" />
<tag name="monolog.logger" channel="messenger" />
<argument type="service" id="cache.messenger.restart_workers_signal" />
<argument type="service" id="logger" on-invalid="ignore" />
</service>

<service id="messenger.listener.stop_worker_on_sigterm_signal_listener" class="Symfony\Component\Messenger\EventListener\StopWorkerOnSigtermSignalListener">
<tag name="kernel.event_subscriber" />
</service>

<!-- routable message bus -->
<service id="messenger.routable_message_bus" class="Symfony\Component\Messenger\RoutableMessageBus">
<argument /> <!-- Message bus locator -->
Expand Down
11 changes: 8 additions & 3 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@ CHANGELOG
4.4.0
-----

* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
pass a `RoutableMessageBus` instance instead.
* Added support for auto trimming of Redis streams.
* `InMemoryTransport` handle acknowledged and rejected messages.
* Made all dispatched worker event classes final.
* Added support for `from_transport` attribute on `messenger.message_handler` tag.
* Added support for passing `dbindex` as a query parameter to the redis transport DSN.
* Added `WorkerStartedEvent` and `WorkerRunningEvent`
* [BC BREAK] Removed `SendersLocatorInterface::getSenderByAlias` added in 4.3.
* [BC BREAK] Removed `$retryStrategies` argument from `Worker::__construct`.
* [BC BREAK] Removed `$retryStrategyLocator` argument from `ConsumeMessagesCommand::__construct`.
* [BC BREAK] Changed arguments of `ConsumeMessagesCommand::__construct`.
* [BC BREAK] Removed `$senderClassOrAlias` argument from `RedeliveryStamp::__construct`.
* [BC BREAK] Removed `UnknownSenderException`.
* [BC BREAK] Removed `WorkerInterface`.
* [BC BREAK] Removed `$onHandledCallback` of `Worker::run(array $options = [], callable $onHandledCallback = null)`.
* [BC BREAK] Removed `StopWhenMemoryUsageIsExceededWorker` in favor of `StopWorkerOnMemoryLimitListener`.
* [BC BREAK] Removed `StopWhenMessageCountIsExceededWorker` in favor of `StopWorkerOnMessageLimitListener`.
* [BC BREAK] Removed `StopWhenTimeLimitIsReachedWorker` in favor of `StopWorkerOnTimeLimitListener`.
* [BC BREAK] Removed `StopWhenRestartSignalIsReceived` in favor of `StopWorkerOnRestartSignalListener`.
* The component is not marked as `@experimental` anymore.
* Marked the `MessengerDataCollector` class as `@final`.

Expand Down
42 changes: 12 additions & 30 deletions src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

namespace Symfony\Component\Messenger\Command;

use Psr\Cache\CacheItemPoolInterface;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
Expand All @@ -23,13 +22,12 @@
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Question\ChoiceQuestion;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;

/**
* @author Samuel Roze <samuel.roze@gmail.com>
Expand All @@ -43,13 +41,11 @@ class ConsumeMessagesCommand extends Command
private $logger;
private $receiverNames;
private $eventDispatcher;
/** @var CacheItemPoolInterface|null */
private $restartSignalCachePool;

/**
* @param RoutableMessageBus $routableBus
*/
public function __construct($routableBus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* EventDispatcherInterface */ $eventDispatcher = null)
public function __construct($routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, array $receiverNames = [])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EventDispatcher is required to make the limit options reliable.

{
if ($routableBus instanceof ContainerInterface) {
@trigger_error(sprintf('Passing a "%s" instance as first argument to "%s()" is deprecated since Symfony 4.4, pass a "%s" instance instead.', ContainerInterface::class, __METHOD__, RoutableMessageBus::class), E_USER_DEPRECATED);
Expand All @@ -58,12 +54,6 @@ public function __construct($routableBus, ContainerInterface $receiverLocator, L
throw new \TypeError(sprintf('The first argument must be an instance of "%s".', RoutableMessageBus::class));
}

if (null !== $eventDispatcher && !$eventDispatcher instanceof EventDispatcherInterface) {
@trigger_error(sprintf('The 5th argument of the class "%s" should be a "%s"', __CLASS__, EventDispatcherInterface::class), E_USER_DEPRECATED);

$eventDispatcher = null;
}

$this->routableBus = $routableBus;
$this->receiverLocator = $receiverLocator;
$this->logger = $logger;
Expand All @@ -73,11 +63,6 @@ public function __construct($routableBus, ContainerInterface $receiverLocator, L
parent::__construct();
}

public function setCachePoolForRestartSignal(CacheItemPoolInterface $restartSignalCachePool)
{
$this->restartSignalCachePool = $restartSignalCachePool;
}

/**
* {@inheritdoc}
*/
Expand Down Expand Up @@ -177,29 +162,23 @@ protected function execute(InputInterface $input, OutputInterface $output)
$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
}

$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;

$worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger);
$stopsWhen = [];
if ($limit = $input->getOption('limit')) {
$stopsWhen[] = "processed {$limit} messages";
$worker = new StopWhenMessageCountIsExceededWorker($worker, $limit, $this->logger);
$this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener($limit, $this->logger));
}

if ($memoryLimit = $input->getOption('memory-limit')) {
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
$worker = new StopWhenMemoryUsageIsExceededWorker($worker, $this->convertToBytes($memoryLimit), $this->logger);
$this->eventDispatcher->addSubscriber(new StopWorkerOnMemoryLimitListener($this->convertToBytes($memoryLimit), $this->logger));
}

if ($timeLimit = $input->getOption('time-limit')) {
$stopsWhen[] = "been running for {$timeLimit}s";
$worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger);
$this->eventDispatcher->addSubscriber(new StopWorkerOnTimeLimitListener($timeLimit, $this->logger));
}

if (null !== $this->restartSignalCachePool) {
$stopsWhen[] = 'received a stop signal via the messenger:stop-workers command';
$worker = new StopWhenRestartSignalIsReceived($worker, $this->restartSignalCachePool, $this->logger);
}
$stopsWhen[] = 'received a stop signal via the messenger:stop-workers command';

$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames)));
Expand All @@ -216,6 +195,9 @@ protected function execute(InputInterface $input, OutputInterface $output)
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
}

$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;

$worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger);
$worker->run([
'sleep' => $input->getOption('sleep') * 1000000,
]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
Expand Down Expand Up @@ -87,6 +87,8 @@ protected function configure(): void
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));

$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
$io->comment('Quit this command with CONTROL-C.');
if (!$output->isVeryVerbose()) {
Expand Down Expand Up @@ -158,7 +160,9 @@ private function runInteractive(SymfonyStyle $io, bool $shouldForce)

private function runWorker(ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce): int
{
$listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io, $receiver, $shouldForce) {
$count = 0;
$listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io, $receiver, $shouldForce, &$count) {
++$count;
$envelope = $messageReceivedEvent->getEnvelope();

$this->displaySingleMessage($envelope, $io);
Expand All @@ -181,14 +185,8 @@ private function runWorker(ReceiverInterface $receiver, SymfonyStyle $io, bool $
$this->logger
);

$count = 0;
try {
$worker->run([], function (?Envelope $envelope) use ($worker, &$count) {
++$count;
if (null === $envelope) {
$worker->stop();
}
});
$worker->run();
} finally {
$this->eventDispatcher->removeListener(WorkerMessageReceivedEvent::class, $listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener;

/**
* @author Ryan Weaver <ryan@symfonycasts.com>
Expand Down Expand Up @@ -63,7 +63,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);

$cacheItem = $this->restartSignalCachePool->getItem(StopWhenRestartSignalIsReceived::RESTART_REQUESTED_TIMESTAMP_KEY);
$cacheItem = $this->restartSignalCachePool->getItem(StopWorkerOnRestartSignalListener::RESTART_REQUESTED_TIMESTAMP_KEY);
$cacheItem->set(microtime(true));
$this->restartSignalCachePool->save($cacheItem);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ private function registerReceivers(ContainerBuilder $container, array $busIds)
$consumeCommandDefinition->replaceArgument(0, new Reference('messenger.routable_message_bus'));
}

$consumeCommandDefinition->replaceArgument(3, array_values($receiverNames));
$consumeCommandDefinition->replaceArgument(4, array_values($receiverNames));
}

if ($container->hasDefinition('console.command.messenger_setup_transports')) {
Expand Down
44 changes: 44 additions & 0 deletions src/Symfony/Component/Messenger/Event/WorkerRunningEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Event;

use Symfony\Component\Messenger\Worker;

/**
* Dispatched after the worker processed a message or didn't receive a message at all.
*
* @author Tobias Schultze <http://tobion.de>
*/
final class WorkerRunningEvent
{
private $worker;
private $isWorkerIdle;

public function __construct(Worker $worker, bool $isWorkerIdle)
{
$this->worker = $worker;
$this->isWorkerIdle = $isWorkerIdle;
}

public function getWorker(): Worker
{
return $this->worker;
}

/**
* Returns true when no message has been received by the worker.
*/
public function isWorkerIdle(): bool
{
return $this->isWorkerIdle;
}
}
34 changes: 34 additions & 0 deletions src/Symfony/Component/Messenger/Event/WorkerStartedEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Event;

use Symfony\Component\Messenger\Worker;

/**
* Dispatched when a worker has been started.
*
* @author Tobias Schultze <http://tobion.de>
*/
final class WorkerStartedEvent
{
private $worker;

public function __construct(Worker $worker)
{
$this->worker = $worker;
}

public function getWorker(): Worker
{
return $this->worker;
}
}
13 changes: 13 additions & 0 deletions src/Symfony/Component/Messenger/Event/WorkerStoppedEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,24 @@

namespace Symfony\Component\Messenger\Event;

use Symfony\Component\Messenger\Worker;

/**
* Dispatched when a worker has been stopped.
*
* @author Robin Chalas <robin.chalas@gmail.com>
*/
final class WorkerStoppedEvent
{
private $worker;

public function __construct(Worker $worker)
{
$this->worker = $worker;
}

public function getWorker(): Worker
{
return $this->worker;
}
}
Loading
0