8000 Adding support for DoctrineClearEntityManagerWorkerSubscriber by weaverryan · Pull Request #1043 · doctrine/DoctrineBundle · GitHub
[go: up one dir, main page]

Skip to content

Adding support for DoctrineClearEntityManagerWorkerSubscriber #1043

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 5, 2019

Conversation

weaverryan
Copy link
Contributor

Hi!

This wires symfony/symfony#34156 as a service. That PR also removes DoctrineClearEntityManagerMiddleware, which was never released in DoctrineBundle, so we can safely remove it without any BC worries.

The idea (validated if/when #34156 is merged) is that we should run $entityManager->clear() between each message that a worker (messenger:consume) handles. I have not made this configurable - I would rather someone approach us with a valid case for disabling this before doing that. If there is a valid case for wanting this disabled, I don't know what it is - so this would help us learn about such a case.

Thanks!

fabpot added a commit to symfony/symfony that referenced this pull request Oct 31, 2019
…eset EM in worker (weaverryan)

This PR was merged into the 4.4 branch.

Discussion
----------

Adding DoctrineClearEntityManagerWorkerSubscriber to reset EM in worker

| Q             | A
| ------------- | ---
| Branch?       | 4.4
| Bug fix?      | yes & no :)
| New feature?  | yes
| Deprecations? | no
| Tickets       | Fix #34073
| License       | MIT
| Doc PR        | symfony/symfony-docs#12575
| DoctrineBundle PR | doctrine/DoctrineBundle#1043

Hi!

I've seen a few developers get "bit" by an issue recently: after running `messenger:consume`, the 2nd, 3rd, 4th, etc message that are handled are getting out-of-date data from Doctrine. The reason is simple:

A) Consume the 1st message, it queries for `Foo id=1` to do something
B) 10 minutes go by
C) Consume the 2nd message. It also queries for `Foo id=1`, but because this is already in the identity map, Doctrine re-uses the data that is now *10* minutes old.

Even though one worker process handles many messages, the system should (as much as possible) isolate each handler from each other. This is one very practical place we can help people. Also, checking the code, I don't think clearing the entity manager will cause any issues for an EM whose Connection has not been "connected" yet (i.e. it will not cause a connection to be established).

We would wire this in DoctrineBundle, and could make it disable-able... in case that's something that's needed.

Commits
-------

e7b9888 Adding DoctrineClearEntityManagerWorkerSubscriber to reset entity manager in worker
@xabbuh xabbuh added this to the 1.12.0 milestone Nov 1, 2019
@weaverryan
Copy link
Contributor Author

Upstream PR is merged - this should be ready :)

@alcaeus alcaeus self-assigned this Nov 5, 2019
@alcaeus alcaeus added the Feature label Nov 5, 2019
@alcaeus alcaeus merged commit 0d20a98 into doctrine:1.12.x Nov 5, 2019
@alcaeus
Copy link
Member
alcaeus commented Nov 5, 2019

Thanks @weaverryan!

@weaverryan weaverryan deleted the enable-messenger-ping-listener branch November 5, 2019 13:41
@byhoratiss
Copy link
byhoratiss commented Jan 14, 2020

It's being a while since this was merged, but, I'm facing the same problem using Doctrine ORM & ODM (mongodb + mariadb schema) in the same project (unrelated).
Why is not working with Doctrine ODM? From what I investigated it should work since is using an Event to clear the EM, but somehow, using ODM is not working.
Can you give me a hint @weaverryan, and ofc, thank you for this!

Edit: nvm, I figured out, just had to create a different service, injecting DocumentManager and using just one clear(). Ty

@Kalyse
Copy link
Kalyse commented Jan 20, 2021

@weaverryan What's the reason this isn't configurable?

My use case here is that I have a simple FlushAfterLimitMessagesStamp::class which allows me to have each message control when it's flush. The actual implementation here is storing some entities (think of them like log data), they are very lightweight, but I do not want each message to persist, flush, persist flush, persist, flush.

Instead, I want, persist, persist, persist, persist, flush from my messages...

For instance, I might publish 20 messages...

$stamps = array_merge($stamps, [
    new FlushAfterLimitMessagesStamp(),
]);
$this->loggerBus->dispatch($message, $stamps);

My Stamp might look like this:

<?php

namespace Lycan\Providers\CoreBundle\Producer\Stamp;

    use Symfony\Component\Messenger\Stamp\StampInterface;

    class FlushAfterLimitMessagesStamp implements StampInterface
    {
        private int $messageCount;
        private int $timeLimit;

        /**
         * FlushAfterLimitMessagesStamp constructor.
         * Purpose here, is to force a Flush of the EM after X messages.
         * Allowing us to batch up messages,.
         *
         * @param int $messageCount
         * @param int $timeLimit
         */
        public function __construct($messageCount = 50, $timeLimit = 10)
        {
            $this->messageCount = $messageCount;
            $this->timeLimit = $timeLimit;
        }

        public function getTimeLimit(): int
        {
            return $this->timeLimit;
        }

        public function getMessageCount(): int
        {
            return $this->messageCount;
        }
    }

I then have my messages, being handled as normal, however, my Listener, will then subscribe to three events.


  return [
                WorkerStoppedEvent::class => 'onWorkerStopped',
                WorkerMessageReceivedEvent::class => 'onMessageReceived',
                WorkerMessageHandledEvent::class => 'onMessageHandled',
            ];

In full, the class looks like.

After writing it, I was batting my head against a wall wondering why the UOW was always empty on next run, and I see it's because the middleware was removed and replaced with an always clearing subscriber. For my use-case, which I feel is legitimate, I have no way of controlling the transaction for flushing.

What I wanted, is something like...

If idle for 10 seconds, flush.
If 20 messages received, flush
If 50 seconds have passed flush.

However, don't flush after every message.


<?php

namespace Lycan\Providers\CoreBundle\Producer\EventSubscriber;

    use Doctrine\ORM\EntityManagerInterface;
    use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
    use Symfony\Component\EventDispatcher\EventSubscriberInterface;
    use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
    use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
    use Lycan\Providers\CoreBundle\Producer\Stamp\FlushAfterLimitMessagesStamp;

    class FlushAfterLimitMessagesListener implements EventSubscriberInterface
    {
        private EntityManagerInterface $em;

        private int $receivedMessages = 0;
        private ?float $nextTick = null;

        /**
         * WorkerStoppedListener constructor.
         */
        public function __construct(EntityManagerInterface $em)
        {
            $this->em = $em;
        }

        /**
         * @return string[]
         */
        public static function getSubscribedEvents(): array
        {
            return [
                WorkerStoppedEvent::class => 'onWorkerStopped',
                WorkerMessageReceivedEvent::class => 'onMessageReceived',
                WorkerMessageHandledEvent::class => 'onMessageHandled',
            ];
        }

        public function onMessageReceived(WorkerMessageReceivedEvent $event): void
        {
            $stamped = $event->getEnvelope()->last(FlushAfterLimitMessagesStamp::class);
            if ($stamped) {
                assert($stamped instanceof FlushAfterLimitMessagesStamp);
                ++$this->receivedMessages;
                $this->nextTick = $this->nextTick ?? microtime(true) + $stamped->getTimeLimit();
            }
        }

        public function onMessageHandled(WorkerMessageHandledEvent $event): void
        {
            $stamped = $event->getEnvelope()->last(FlushAfterLimitMessagesStamp::class);
            if ($stamped) {
                assert($stamped instanceof FlushAfterLimitMessagesStamp);
                if ($this->receivedMessages >= $stamped->getMessageCount()) {
                    $this->receivedMessages = 0;
                    $this->nextTick = null;
                    $this->em->flush();
                }
                if ($this->nextTick <= microtime(true)) {
                    $this->em->flush();
                    $this->receivedMessages = 0;
                    $this->nextTick = null;
                }
                $this->nextTick = $this->nextTick ?? microtime(true) + ($stamped->getTimeLimit());
            }
        }

        public function onWorkerStopped(WorkerStoppedEvent $stoppedEvent): void
        {
            // We've finished, let's just always flush.
            if ($this->nextTick > 0 || $this->receivedMessages > 0) {
                $this->em->flush();
            }
        }
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants
0