-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] implementation of messenger:consume
, which processes messages concurrently
#53964
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 7.4
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger; | ||
|
||
use Amp\Cache\LocalCache; | ||
use Amp\Cancellation; | ||
use Amp\Parallel\Worker\Task; | ||
use Amp\Sync\Channel; | ||
use App\Kernel; | ||
alli83 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
use Psr\Container\ContainerInterface; | ||
use Symfony\Component\Dotenv\Dotenv; | ||
use Symfony\Component\Messenger\Stamp\AckStamp; | ||
|
||
class DispatchTask implements Task | ||
Check failure on line 23 in src/Symfony/Component/Messenger/DispatchTask.php
|
||
{ | ||
private static ?LocalCache $cache = null; | ||
|
||
public function __construct( | ||
private Envelope $envelope, | ||
private array $stamps, | ||
private readonly string $env, | ||
private readonly bool $isDebug, | ||
private readonly string $projectDir, | ||
) { | ||
if (!class_exists(LocalCache::class)) { | ||
throw new \LogicException(\sprintf('Package "amp/cache" is required to use the "%s". Try running "composer require amphp/cache".', LocalCache::class)); | ||
} | ||
} | ||
|
||
public function run(Channel $channel, Cancellation $cancellation): mixed | ||
{ | ||
$container = $this->getContainer(); | ||
$envelope = $this->dispatch($container, $channel); | ||
|
||
return $envelope->withoutStampsOfType(AckStamp::class); | ||
} | ||
|
||
private function dispatch(ContainerInterface $container, $channel) | ||
{ | ||
$messageBus = $container->get(MessageBusInterface::class); | ||
|
||
return $messageBus->dispatch($this->envelope, $this->stamps); | ||
} | ||
|
||
private function getContainer() | ||
{ | ||
$cache = self::$cache ??= new LocalCache(); | ||
dunglas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
$container = $cache->get('cache-container'); | ||
|
||
// if not in cache, create container | ||
if (!$container) { | ||
if (!method_exists(Dotenv::class, 'bootEnv')) { | ||
throw new \LogicException(\sprintf("Method bootEnv de \"%s\" doesn't exist.", Dotenv::class)); | ||
} | ||
|
||
(new Dotenv())->bootEnv($this->projectDir.'/.env'); | ||
|
||
if (!class_exists(Kernel::class) && !isset($_ENV['KERNEL_CLASS'])) { | ||
throw new \LogicException('You must set the KERNEL_CLASS environment variable to the fully-qualified class name of your Kernel in .env or have "%s" class.', Kernel::class); | ||
} elseif (isset($_ENV['KERNEL_CLASS'])) { | ||
$kernel = new $_ENV['KERNEL_CLASS']($this->env, $this->isDebug); | ||
} else { | ||
$kernel = new Kernel($this->env, $this->isDebug); | ||
} | ||
|
||
$kernel->boot(); | ||
|
||
$container = $kernel->getContainer(); | ||
$cache->set('cache-container', $container); | ||
} | ||
|
||
return $container; | ||
} | ||
|
||
public function getEnvelope(): Envelope | ||
{ | ||
return $this->envelope; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger; | ||
|
||
use Amp\Parallel\Worker\ContextWorkerPool; | ||
nicolas-grekas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
use Symfony\Component\Messenger\Stamp\FutureStamp; | ||
|
||
use function Amp\async; | ||
use function Amp\Parallel\Worker\workerPool; | ||
|
||
/** | ||
* Using this bus will enable concurrent message processing without the need for multiple workers | ||
* using multiple processes or threads. | ||
* It requires a ZTS build of PHP 8.2+ and ext-parallel to create threads; otherwise, it will use processes. | ||
*/ | ||
final class ParallelMessageBus implements MessageBusInterface | ||
{ | ||
public static ?ContextWorkerPool $worker = null; | ||
Check failure on line 27 in src/Symfony/Component/Messenger/ParallelMessageBus.php
|
||
alli83 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
public function __construct( | ||
private array $something, | ||
private readonly string $env, | ||
private readonly string $debug, | ||
private readonly string $projectdir, | ||
) { | ||
if (!class_exists(ContextWorkerPool::class)) { | ||
throw new \LogicException(\sprintf('Package "amp/parallel" is required to use the "%s". Try running "composer require amphp/parallel".', self::class)); | ||
} | ||
} | ||
|
||
public function dispatch(object $message, array $stamps = []): Envelope | ||
{ | ||
$worker = (self::$worker ??= workerPool()); | ||
Check failure on line 42 in src/Symfony/Component/Messenger/ParallelMessageBus.php
|
||
|
||
$envelope = Envelope::wrap($message, $stamps); | ||
$task = new DispatchTask($envelope, $stamps, $this->env, $this->debug, $this->projectdir); | ||
Check failure on line 45 in src/Symfony/Component/Messenger/ParallelMessageBus.php
|
||
|
||
$future = async(function () use ($worker, $task) { | ||
Check failure on line 47 in src/Symfony/Component/Messenger/ParallelMessageBus.php
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can use |
||
return $worker->submit($task); | ||
}); | ||
|
||
return $envelope->with(new FutureStamp($future)); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger\Stamp; | ||
|
||
use Amp\Future; | ||
|
||
/** | ||
* This stamp allows passing the future representing the potential result of the handler, | ||
* which is treated as an asynchronous operation, | ||
* and will be retrieved later by the worker to ack or nack based on the obtained result. | ||
*/ | ||
final readonly class FutureStamp implements StampInterface | ||
{ | ||
public function __construct(private Future $future) | ||
Check failure on line 23 in src/Symfony/Component/Messenger/Stamp/FutureStamp.php
|
||
{ | ||
} | ||
|
||
public function getFuture(): Future | ||
Check failure on line 27 in src/Symfony/Component/Messenger/Stamp/FutureStamp.php
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this com A377 ment to others. Learn more. This leaks the |
||
{ | ||
return $this->future; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
KERNEL_CLASS=Symfony\Component\Messenger\Tests\Fixtures\App\Kernel |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,46 @@ | ||||||
<?php | ||||||
|
||||||
namespace Symfony\Component\Messenger\Tests\Fixtures\App; | ||||||
|
||||||
use Symfony\Bundle\FrameworkBundle\FrameworkBundle; | ||||||
use Symfony\Bundle\FrameworkBundle\Kernel\MicroKernelTrait; | ||||||
use Symfony\Component\Config\Loader\LoaderInterface; | ||||||
use Symfony\Component\DependencyInjection\ContainerBuilder; | ||||||
use Symfony\Component\HttpKernel\Kernel as SymfonyKernel; | ||||||
use Symfony\Component\Messenger\MessageBus; | ||||||
use Symfony\Component\Messenger\MessageBusInterface; | ||||||
|
||||||
class Kernel extends SymfonyKernel | ||||||
{ | ||||||
use MicroKernelTrait; | ||||||
|
||||||
public function registerBundles(): iterable | ||||||
{ | ||||||
yield new FrameworkBundle(); | ||||||
} | ||||||
|
||||||
public function registerContainerConfiguration(LoaderInterface $loader): void | ||||||
{ | ||||||
$loader->load(function (ContainerBuilder $container) use ($loader) { | ||||||
$container->loadFromExtension('framework', [ | ||||||
'router' => [ | ||||||
'resource' => 'kernel::loadRoutes', | ||||||
'type' => 'service', | ||||||
], | ||||||
]); | ||||||
$container | ||||||
->register('message.bus', MessageBus::class); | ||||||
$container->setAlias(MessageBusInterface::class, 'message.bus')->setPublic(true); | ||||||
}); | ||||||
} | ||||||
|
||||||
public function getCacheDir(): string | ||||||
{ | ||||||
return sys_get_temp_dir().'/'. \Symfony\Component\HttpKernel\Kernel::VERSION.'/EmptyAppKernel/cache/'.$this->environment; | ||||||
} | ||||||
|
||||||
public function getLogDir(): string | ||||||
{ | ||||||
return sys_get_temp_dir().'/'.Kernel::VERSION.'/EmptyAppKernel/logs'; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
and others in this file? |
||||||
} | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger\Tests; | ||
|
||
use Amp\Parallel\Worker\Worker; | ||
use PHPUnit\Framework\TestCase; | ||
use Symfony\Component\Messenger\Envelope; | ||
use Symfony\Component\Messenger\MessageBusInterface; | ||
use Symfony\Component\Messenger\ParallelMessageBus; | ||
use Symfony\Component\Messenger\Stamp\FutureStamp; | ||
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; | ||
|
||
class ParallelMessageBusTest extends TestCase | ||
{ | ||
private string $env = 'dev'; | ||
private bool $debug = false; | ||
private string $projectDir = 'path/to/project'; | ||
|
||
public function testItHasTheRightInterface() | ||
{ | ||
if (!class_exists(Worker::class)) { | ||
$this->markTestSkipped(\sprintf('%s not available.', Worker::class)); | ||
} | ||
|
||
$bus = new ParallelMessageBus([], $this->env, $this->debug, $this->projectDir); | ||
|
||
$this->assertInstanceOf(MessageBusInterface::class, $bus); | ||
} | ||
|
||
public function testItReturnsWithFutureStamp() | ||
{ | ||
if (!class_exists(Worker::class)) { | ||
$this->markTestSkipped(\sprintf('%s not available.', Worker::class)); | ||
} | ||
|
||
$message = new DummyMessage('Hello'); | ||
|
||
$bus = new ParallelMessageBus([], $this->env, $this->debug, $this->projectDir); | ||
|
||
$envelope = $bus->dispatch(new Envelope($message)); | ||
|
||
$this->assertInstanceOf(FutureStamp::class, $envelope->last(FutureStamp::class)); | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.