-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] implementation of messenger:consume
, which processes messages concurrently
#53964
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 7.4
Are you sure you want to change the base?
Conversation
ea02a2e
to
8cf4f65
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
quick review, I like it
src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php
Outdated
Show resolved
Hide resolved
8b8e52c
to
66200db
Compare
c1d082a
to
89b22b9
Compare
src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php
Outdated
Show resolved
Hide resolved
89b22b9
to
885c278
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nicely done! Just a few questions/comments.
src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
Outdated
Show resolved
Hide resolved
8fac51d
to
f35b5ad
Compare
src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Tests/ParallelMessageBusTest.php
Outdated
Show resolved
Hide resolved
e6b8863
to
5d6bae6
Compare
f976458
to
52e9176
Compare
52e9176
to
522a316
Compare
@@ -282,4 +329,37 @@ public function getMetadata(): WorkerMetadata | |||
{ | |||
return $this->metadata; | |||
} | |||
|
|||
public function handleFutures(string $transportName, $parallelProcessLimit): void |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
$parallelProcessLimit
seems unused
@IndraGunawan I'd love to, but I can't decide that on my own 😅 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when conflicts will be resolved
It's relevant. BatchHandler will handle processing data synchronously. If I understood well, do you suggest it miss async capabilities? Somehow updating BatchHandler to make it async aware or declaring another BatchAsyncHandler trait could allow processing batch messaging with async mechanisms. |
The purpose of this PR is to enable concurrent handling of Symfony Messenger messages. This PR utilizes the amphp/parallel library to achieve this. During the worker configuration, it is possible to define how many processes should be batched in parallel before flushing, meaning accessing the result, acknowledging, or initiating executions in case of errors.
In the child process, a container is cached. Therefore, in each child process, it will be possible to inject services and make requests, etc.
In this initial use case, the decision was made not to reuse the parent connection, such as the Doctrine connection. This is because it could currently be inconvenient for users, as they would need to modify their handlers, which could be cumbersome.
Even without reusing the parent connection, there is a performance gain since operations are performed concurrently for x number of processes. Therefore, even if there is a handler with blocking code, it does not prevent other child processes from proceeding.
for example, in the case we have 2 handlers and one of the handlers has a 4-second pause, and we have, for instance, 40 messages processed concurrently with a batch size of 10 using the ParallelBus, it's approximately 6 times faster between handling the first message and the last message (This observation was made during a test conducted in our development environment)
In order to dispatch messages concurrently, it is necessary to first consider the ParallelMessageBus:
! It works for async mode - It doesn't work with BatchHandler Trait !
Async mode:
You can specify how many processes will be executed in parallel.
It will define how many processes will be launched - threaded to then complete them one by one and get the return of the message processing
By default: 10
TODO:
This feature is being developed by @coopTilleuls and @TradersPost and it has been designed with @jwage and @dunglas