8000 [Messenger] implementation of `messenger:consume`, which processes messages concurrently by alli83 · Pull Request #53964 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[Messenger] implementation of messenger:consume, which processes messages concurrently #53964

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: 7.4
Choose a base branch
from

Conversation

alli83
Copy link
Contributor
@alli83 alli83 commented Feb 16, 2024
Q A
Branch? 7.3
Bug fix? no
New feature? yes
Deprecations? no
Issues
License MIT

The purpose of this PR is to enable concurrent handling of Symfony Messenger messages. This PR utilizes the amphp/parallel library to achieve this. During the worker configuration, it is possible to define how many processes should be batched in parallel before flushing, meaning accessing the result, acknowledging, or initiating executions in case of errors.

In the child process, a container is cached. Therefore, in each child process, it will be possible to inject services and make requests, etc.

In this initial use case, the decision was made not to reuse the parent connection, such as the Doctrine connection. This is because it could currently be inconvenient for users, as they would need to modify their handlers, which could be cumbersome.

Even without reusing the parent connection, there is a performance gain since operations are performed concurrently for x number of processes. Therefore, even if there is a handler with blocking code, it does not prevent other child processes from proceeding.
for example, in the case we have 2 handlers and one of the handlers has a 4-second pause, and we have, for instance, 40 messages processed concurrently with a batch size of 10 using the ParallelBus, it's approximately 6 times faster between handling the first message and the last message (This observation was made during a test conducted in our development environment)

In order to dispatch messages concurrently, it is necessary to first consider the ParallelMessageBus:
! It works for async mode - It doesn't work with BatchHandler Trait !

Async mode:

$this->bus->dispatch(new UserMessage($user->getId()), [new BusNameStamp('parallel_bus')]);

You can specify how many processes will be executed in parallel.
It will define how many processes will be launched - threaded to then complete them one by one and get the return of the message processing
By default: 10

php bin/console messenger:consume async -p 20

TODO:

  • CHANGELOG:

This feature is being developed by @coopTilleuls and @TradersPost and it has been designed with @jwage and @dunglas

@carsonbot carsonbot added this to the 7.1 milestone Feb 16, 2024
@carsonbot carsonbot changed the title [Messenger] implementation of messenger:consume, which processes messages concurrently [Messenger] implementation of messenger:consume, which processes messages concurrently Feb 16, 2024
@alli83 alli83 force-pushed the messenger-handling-concurrent-messages branch from ea02a2e to 8cf4f65 Compare February 16, 2024 08:03
Copy link
Member
@lyrixx lyrixx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quick review, I like it

@alli83 alli83 force-pushed the messenger-handling-concurrent-messages branch 2 times, most recently from 8b8e52c to 66200db Compare February 16, 2024 08:16
@alli83 alli83 force-pushed the messenger-handling-concurrent-messages branch 3 times, most recently from c1d082a to 89b22b9 Compare February 16, 2024 08:42
@alli83 alli83 force-pushed the messenger-handling-concurrent-messages branch from 89b22b9 to 885c278 Compare February 16, 2024 13:36
Copy link
Contributor
@jwage jwage left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nicely done! Just a few questions/comments.

@alli83 alli83 force-pushed the messenger-handling-concurrent-messages branch 11 times, most recently from 8fac51d to f35b5ad Compare February 25, 2024 22:34
@alli83 alli83 force-pushed the messenger-handling-concurrent-messages branch from e6b8863 to 5d6bae6 Compare August 17, 2024 06:24
@alli83 alli83 marked this pull request as draft August 19, 2024 13:53
@alli83 alli83 force-pushed the messenger-handling-concurrent-messages branch 5 times, most recently from f976458 to 52e9176 Compare August 20, 2024 01:15
@alli83 alli83 force-pushed the messenger-handling-concurrent-messages branch from 52e9176 to 522a316 Compare August 20, 2024 03:56
@alli83 alli83 marked this pull request as ready for review August 20, 2024 21:53
@@ -282,4 +329,37 @@ public function getMetadata(): WorkerMetadata
{
return $this->metadata;
}

public function handleFutures(string $transportName, $parallelProcessLimit): void
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$parallelProcessLimit seems unused

@IndraGunawan
Copy link
Contributor
IndraGunawan commented Oct 4, 2024

@dunglas @alli83 any chance this feature will be included in v7.2 release?

@dunglas
Copy link
Member
dunglas commented Oct 4, 2024

@IndraGunawan I'd love to, but I can't decide that on my own 😅

@fabpot fabpot modified the milestones: 7.2, 7.3 Nov 20, 2024
Copy link
Member
@dunglas dunglas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when conflicts will be resolved

@matyo91
Copy link
Contributor
matyo91 commented Mar 28, 2025

! It works for async mode - It doesn't work with BatchHandler Trait !

It's relevant. BatchHandler will handle processing data synchronously. If I understood well, do you suggest it miss async capabilities? Somehow updating BatchHandler to make it async aware or declaring another BatchAsyncHandler trait could allow processing batch messaging with async mechanisms.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

0