8000 [Messenger] Provide a deduplication strategy to Queue · Issue #54126 · symfony/symfony · GitHub
[go: up one dir, main page]

Skip to content

[Messenger] Provide a deduplication strategy to Queue #54126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
VincentLanglet opened this issue Mar 1, 2024 · 3 comments · Fixed by #54141
Closed

[Messenger] Provide a deduplication strategy to Queue #54126

VincentLanglet opened this issue Mar 1, 2024 · 3 comments · Fixed by #54141

Comments

@VincentLanglet
Copy link
Contributor
VincentLanglet commented Mar 1, 2024

Description

It would be useful to provide an easy way to handle duplicate message in queue.
I might be wrong but I found nothing implemented yet, just a discussion here #49215

The idea would be to propose something like an interface Message could implement

interface DeduplicatedMessageInterface

and if they doing so, when I try to publish such a message, it checks if the queue already has the same message with the same body. If so, the new message is not added to the queue.

I assume something could be possible with Symfony lock for instance:

  • Before dispatching the message we check there is no lock on the "message identifier".
    • If there is no lock, we're adding one.
    • If there is already a lock, we're not dispatching the message
  • And in the MessageHandler, we're releasing the lock after the execution of the __invoke method.

But maybe a better implementation exists ?

Example

No response

@VincentLanglet
Copy link
Contributor Author
VincentLanglet commented Mar 1, 2024

I assume something could be possible with Symfony lock for instance:

  • Before dispatching the message we check there is no lock on the "message identifier".

    • If there is no lock, we're adding one.
    • If there is already a lock, we're not dispatching the message
  • And in the MessageHandler, we're releasing the lock after the execution of the __invoke method.

I tried doing this with a Middleware and it requires more testing but this might to work with something like

class LockMiddleware implements MiddlewareInterface
{
    public function __construct(
        private readonly LockFactory $lockFactory,
    ) {
    }

    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        $message = $envelope->getMessage();

        // If we're trying to dispatch a lockable message.
        if ($message inst
8000
anceof LockableMessage && null === $envelope->last(ReceivedStamp::class)) {
            $key = $message->getKey();

            if (null !== $key) {
                // The acquire call must be done before stamping the message
                // in order to have the full state of the key in the stamp.
                $canAcquire = $this->lockFactory->createLockFromKey($key, autoRelease: false)->acquire();

                $envelope = $envelope->with(new LockStamp($key));
                if (!$canAcquire) {
                    return $envelope;
                }
            }
        }

        try {
            $envelope = $stack->next()->handle($envelope, $stack);
        } finally {
            // If we've received a lockable message, we're releasing it.
            if (null !== $envelope->last(ReceivedStamp::class)) {
                $stamp = $envelope->last(LockStamp::class);
                if ($stamp instanceof LockStamp) {
                    $key = $stamp->key;
                    $this->lockFactory->createLockFromKey($key, autoRelease: false)->release();
                }
            }
        }

        return $envelope;
    }
}

@faizanakram99
Copy link
Contributor
faizanakram99 commented Mar 1, 2024

many queues like sqs support deduplication out of the box, i don't think it is something symfony should do (read care about).

@VincentLanglet
Copy link
Contributor Author

many queues like sqs support deduplication out of the box

The deduplication provided by sqs is different from the one I proposed in the issue
cf https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagededuplicationid-property.html

Amazon SQS continues to keep track of the message deduplication ID even after the message is received and deleted.

It means even after a message is handled, you can't add a new message to the queue during the 5 minute deduplication-interval.

What I'm looking for is a deduplication only when the message is in queue. As soon as the message is received/deleted, it's possible to add another message.

i don't think it is something symfony should do

If it's something which can be fully implemented with only a Symfony Middleware and the Lock component, with full queue-agnostic implementation ; it could be great having the feature on Symfony side.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants
0