Description
Description
Hello,
I'm evaluating a use case where queue workers are running with a 10-minute "get_notify_timeout" option (i.e. the max time the pgsqlGetNotify() will synchronously wait for notifications from Postgres), and I found a few problems with it, to which I would like to propose a solution. The problems and solutions are interconnected, so it's perhaps best to discuss them in one ticket and then, if needed, create separate tickets/PRs (which I can do when time comes).
Problem no. 1 (critical)
Imagine there are two Doctrine queues: high_priority
and low_priority
, and a queue worker is started with the following cli command:
bin/console messenger:consume high_priority low_priority
In other words, the worker will consume messages from both queues, preferring the high_priority. Steps to reproduce the problem:
- Have no tasks queued for both queues.
- Start the worker.
- Due to an implementation detail, the following happens:
- The worker will conclude that "high_priority" queue is empty and make a note of it. It will not sync-wait on postgres' notifications at this point yet.
- The worker will conclude that "low_priority" queue is empty and make a note of it.
- The worker enters a new "master while" loop.
- While attempting to get a new queue task for the "high_priority" queue, the worker executes a blocking sync-wait for a postgres notification here
- It's important to let the worker block at the sync-wait before continuing with these "steps to reproduced". In practice, one should wait for about a second.
- Due to an implementation detail, the following happens:
- Add a new task for the "low_priority" queue. One can do it by hand via an INSERT sql statement with appropriate data.
- Postgres delivers a notification to the worker that a new record has been added to the messenger_queue db table.
- The worker "wakes up" from the sync-wait.
- The worker concludes that the new record is not for the "high_priority" queue and continues to the next queue.
- The worker attempts to retrieve a task for the "low_priority" queue. Due to the fact that the "queueEmptiedAt" hasn't been cleared for the "low_priority" queue connection, the worker enters a blocking sync-wait on the "low_priority" queue.
In the last step, the worker should have retrieved the new task for the "low_priority" queue, but instead it entered a blocking sync-wait. This is a problem, especially when the timeout for the sync-wait is 10 minutes, as I mentioned before.
Problem no. 2
The worker's "--time-limit" option doesn't work as expected, when the worker is in the process of sync-waiting for new queue tasks.
Problem no. 3
In case there is a delayed task queued that should be picked up for processing after the next 1 minute passes, and the worker enters a sync-wait with a timeout of 10 minutes, then the delayed task will be processed only after the worker times out the sync-wait (assuming nothing else wakes up the worker from the sync-wait).
Improvement proposition no. 1
I gave these problems a few thoughts and would like to propose the following changes/improvements.
The sync-wait should not happen in PostgreSqlConnection::get(). Instead, it should happen during the WorkerRunningEvent. In other words, conceptually, the worker should not sync-wait while attempting to retrieve new tasks, but instead should sync-wait while being idle (i.e. in the idle sleep if-block). In further other words, instead of the sync-wait attempting to answer the question of "has potentially a new task been added for queue X", it should answer the question of "has something interesting happen on the messenger_queue db table".
This would effectively move the majority of the PostgreSqlConnection::get() code to an event listener, and would allow the PostgreSqlConnection::get()
to not override the base class implementation. I believe it would also allow for the idiosyncrasy in the base class to be removed.
This would fix the problem no. 1 mentioned above. It would also allow for an easier fix for problems no. 2 and no. 3 (see the next improvement proposition).
Improvement proposition no. 2
With the improvement proposition no. 1 in place, it would allow the worker to choose the sync-wait timeout more intelligently. I propose that the sync-wait timeout value for the pgsqlGetNotify()
to be chosen in the following way:
- Query the queue db table to see if there is at least one pending queue task for the queues that the worker has been started for. If yes, do not sync-wait at all.
- This potentially prevents the worker from being pwned by the postgres' notify grouping behaviour, but on second thought, it might not be needed at all, considering the fact that the worker's idle-wait if-block isn't executed when there was at least 1 task processed in the past while-loop.
- Query the queue db table to see if there are any delayed queue tasks for the queues that the worker has been started for. Pick the one that becomes available for processing soonest.
- Compute the worker's remaining time, as requested by the
--time-limit
cli command option. - Out of the 3 time values (the soonest delayed task, the remaining --time-limit, and the configured
get_notify_timeout
option) choose the lowest one, and use it for thepgsqlGetNotify()
sync-wait call.
This fixes problems no. 2 and no. 3. It perhaps is also simpler conceptually, as the sync-waiting logic is cleanly isolated in its own symfony event listener, rather than being a part of an overridden method in PostgreSqlConnection::get()
.
Closing words
Implementing the sync-waiting as proposed above would also conceptually usher in a solid opinion on how sync-waiting should be done in Messenger (compared to polling), i.e. the sync-waiting transport should be recommended to sync-wait on any change happening on the underlying queue(s), rather than only sync-waiting on one queue's new tasks.
I truly believe that this is the way to move forward with this, and for it to have an immense value for the ecosystem (one value being that --time-limit, multi-queue workers, and delayed queue tasks would JustWorkTM (properly), even if the worker is in the process of sync-waiting). I can prepare the code change if the proposition is accepted.
Thank you.
Example
No real example, as the ticket proposes an underlying implementation change, rather than dev-facing interface change (although, the change allows for more features in Messenger to work, when sync-waiting).