-
-
Notifications
You must be signed in to change notification settings - Fork 32.6k
Fixed #35629 -- Added support for async database connections and cursors. #18408
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
ed91e56
to
83099c9
Compare
3a3fc57
to
e6bbcd1
Compare
1b81214
to
b42cc45
Compare
b42cc45
to
86158fb
Compare
86158fb
to
3be837e
Compare
3be837e
to
288b3c9
Compare
288b3c9
to
df58a82
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sufficiently familiar with the internals of the ORM/database layer to comment on much of the internal logic here, but the general shape of this code and approach seems to make sense to me
(left a few questions, nothing terribly blocking)
ServerCursor implements the logic required to declare and scroll | ||
through named cursors. | ||
|
||
Mixing ClientCursorMixin in wouldn't be necessary if Cursor allowed to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps out of scope for this PR, but I don't think it's clear where this Cursor
comes from. I think it's defined in psycopg3
, but I'm not confident in that.
This should also be clarified in the ServerSideCursor
docstring above too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
anything under Database.
comes from psycopg2
or psycopg3
.
In this specific case, ClientCursonMixin
and AsyncClientCursor
come from psycopg3
ServerSideCursor
Is our custom class that derives from postgres' ClientCursorMixin
and AsyncServerCursor
/ServerCursor
stop = time.monotonic() | ||
duration = stop - start | ||
< 8000 span class="blob-code-inner blob-code-marker-addition"> if use_last_executed_query: | ||
sql = self.db.ops.last_executed_query(self.cursor, sql, params) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might this need to be async?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've checked across all backends, and last_executed_query
is a non-blocking method that basically only does string interpolation / decoding of a string that's already stored on the DBAPI cursor.
259d10c
to
53fae35
Compare
efafc58
to
49141bb
Compare
|
||
async with new_connection() as connection: | ||
async with connection.acursor() as c: | ||
await c.aexecute(...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a question directly for MR it's about general architecture decisions from the DEP 0009 🙂
Why global connection states are fantastic, and why explicitly creating a connection is bad default behavior
1. Drilling arguments
When u open an old aiohttp application with sqlalchemy-core u can find code like this
async def select_messages_by_room_id(session, room_id: int) -> list[Message]:
cursor = await session.scalars(
select(Message).where(Message.room_id == room_id).order_by(Message.id)
)
return cursor.all()
looks easy we have a utility function that receives a session/connection as an argument. but when we use nested functions, it becomes painful
async def end_function_with_required(conn):
async with conn() as c:
...
async def foo(conn):
await bar(conn)
async with conn() as c:
...
async def view(request):
async with new_connection() as conn:
await foo(conn)
that's why people use context var to share a global connection
@asynccontextmanager
async def acquire() -> t.AsyncIterator[SAConnection]:
current_conn = _db_conn.get(None)
if current_conn:
yield current_conn
else:
async with resources.db.acquire() as conn:
token = _db_conn.set(conn)
try:
yield conn
finally:
_db_conn.reset(token)
async def execute(query: QueryType) -> ResultProxy:
async with acquire() as conn:
return await conn.execute(query)
@asynccontextmanager
async def transaction() -> t.AsyncIterator[Transaction]:
async with acquire() as conn, conn.begin_nested() as tr:
_db_tr.set(tr)
yield tr
usage
async def create_notification():
return await execute('INSERT ...')
async def create_notification():
nf = await create_notification()
await rest_client_call(nf.id)
async def view(request):
async with transaction():
await create_some(...)
await send_notification()
I am not sure that examples show this hell related to connection as a parameter but second aproach is mutch mich better (and this is actually default django behavior)
U can tell me. Hold on. Instead of putting a connection as a parameter, u can create separate connections in each function
- for transactions, I can't
- Creating new connections has consequences. consistency and cost
2. Consistency
Simple example with GraphQL. We have a list of users
{
user {
id
name
}
}
and 2 query
- fetch paginated list of user ids
- dataloader that fetch detail info about user by ids
this is a get query, so for that, we can use replicas. we have 1, 5, and 10 replicas no matter but each of them has replica lag, and it is different for each of them
we select a list of users from the master as an example. In this response, we have a fresh user. After that, we selected detailed info from another replica (due to the load balancer) and got an error coz this user doesn't exist due to replica lag
3. Cost
The new connection is not free.
- take time
- use db resources
yes, u can get an advantage when u do call in parallel
async def my_view(request):
# Query authors and books concurrently
task_authors = asyncio.create_task(get_authors("an"))
task_books = asyncio.create_task(get_books("di"))
return render(
request,
"template.html",
{
"books": await task_books,
"authors": await task_authors,
},
)
but when u need something like that
book = await get_book('1')
author_reviews = await get_reviews(book.author_id)
or like this
book = await create_book(...)
return await books_count()
it will take time
about the db server. for Postgres 1k connections, it's not easy if you earn max connection or use all CPU, your server goes down
with this approach, it's easy to lose control over that. the same example with GraphQL, each resolver is a call to the database as a result of connection to the database. u can't control how many fields your client will ask, and it becomes very hard to manage the load coz u don't have a correlation between PRC and load
that's why one connection per request is a good idea.
it's good to have a feature like this, but in my opinion, users have to know what they do, by design, the approach will push use it as is without understanding. it's like with PK as big as default, it's probably not the best option but it solves a lot of problems. the same thing with connection. it's good to have the ability to create few connections per request but by default 1 connection per 1 request is a better choice.
Alternative
By default
from django.db import connections
async with connections["default"].cursor() as cursor:
# Your code here
...
as option
from django.db import connections
async with connections["default"].new() as conn:
async with conn.cursor() as cursor
# Your code here
...
any ideas?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one more thought
I see that in DEP 0009, we use an implicit connection for the ORM.
async def get_authors(pattern):
# Create a new context to call concurrently
async with db.new_connections():
return [
author.name
async for author in Authors.objects.filter(name__icontains=pattern)
]
it looks like we can keep interfaces for sync and async connections the same
from django.db import async_connections
async with async_connections["default"] as conn:
async with conn.acursor() as cursor:
# Your code here ...
for that, we need to do something like that
class AsyncConnectionHandler:
def __getitem__(self, alias):
async_alias = getattr(self._aliases, alias, None)
if async_alias:
return async_alias.get_connection()
async_alias = AsyncAlias()
setattr(self._aliases, alias, async_alias)
conn = self.create_connection(alias)
async_alias.add_connection(conn)
return conn
if u need to do optimization, u can easily create a new connection like this
async def get_authors(pattern):
async with async_connections.new_connection("default"):
return [
author.name
async for author in Authors.objects.filter(name__icontains=pattern)
]
advantages
- lazy/implicit connection creating
- the same interface for ConnectionHandler + connection
- bind new_connection to asynchronous connections (code structure)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand the motivations behind your idea. But I'm unclear on how (or if) your proposal differs with DEP0009. Could you clarify?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
creating an explicit connection. It is mainly different. as for me the implicit approach (with we use for sync connections) is much much better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me that explicit connection creation is at the core of DEP009.
If you think DEP009 needs to be amended, you should probably start a discussion on the djangoproject forum. But for now this PR is about just implementing DEP009 as accepted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, as I said at the top of the current thread
< F438 blockquote>This is not a question directly for MR, it's about general architecture decisions from the DEP 0009 🙂
But for me, it's important to leave comments here to show a pool of problems related to this solution before the merge
(coz after the merge, it will be hard to change due to users who already use it as is).
I'll follow your suggestion and create a discussion on the Django forum to shift the conversation 😌 thx
we can use something like this https://github.com/cbornet/blockbuster with tests to be sure that we don't have any blocking operations as option |
setattr(self._connections, "_stack", conns) | ||
|
||
|
||
class AsyncConnectionHandler: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here are a few thoughts about that
- Django has
BaseConnectionHandler
, which provides a base class for managing connections (cache/database etc). We can follow this way and implementBaseAsyncConnectionHandler
and reuse it for cache connection as well - I think keeping the same interface for connection and asynchronous connection is important. It increases the amount of code that we can reuse and, as a result, decrease code duplication. For these purposes, we can organize all logic related to the stack into a
StackLocal
instance
example of the implementation - Arfey@be7f02c
what do u think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fcurella if it looks good for u I can help and prepare MR for your branch
class AsyncConnectionHandler: | ||
""" | ||
Context-aware class to store async connections, mapped by alias name. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This solution perfectly covers existing requirements. But besides that, we have side effects. I'll try to explain below
example #1
async def get_authors(pattern):
# Create a new context to call concurrently
async with db.new_connections():
return [
author.name
async for author in Authors.objects.filter(name__icontains=pattern)
]
async def get_books(pattern):
# Create a new context to call concurrently
async with db.new_connections():
return [
book.title
async for book in Book.objects.filter(name__icontains=pattern)
]
async def my_view(request):
# Query authors and books concurrently
task_authors = asyncio.create_task(get_authors("an"))
task_books = asyncio.create_task(get_books("di"))
return render(
request,
"template.html",
{
"books": await task_books,
"authors": await task_authors,
},
)
it will work perfectly.
example #2
async def my_view(request):
task_authors = get_authors("an")
task_books = get_books("di")
return render(
request,
"template.html",
{
"books": await task_books,
"authors": await task_authors,
},
)
it still works but looks odd coz we don't have any reason to create a new connection for each sequential request. Anyway, we can do it.
example #3
async def get_authors(pattern):
# without new connection
return [
author.name
async for author in Authors.objects.filter(name__icontains=pattern)
]
async def get_books(pattern):
# create new connection and put it on stack
async with db.new_connections():
return [
book.title
async for book in Book.objects.filter(name__icontains=pattern)
]
# delete connection from the stack
# delete parent connection from the stack
await async_connections['default'].close()
async def my_view(request):
async with db.new_connections():
# Query authors and books concurrently
task_authors = asyncio.create_task(get_authors("an"))
task_books = asyncio.create_task(get_books("di"))
return render(
request,
"template.html",
{
"books": await task_books,
"authors": await task_authors,
},
)
so I have full access to the parent connections (connection stack) and can do everything what I want with it.
I am not a big fan of db.new_connections
😂 so about my "very important" view
get rid of db.new_connections
and use decorator instead
def concurrent_async_db(func):
if not inspect.iscoroutinefunction(func):
raise TypeError("Function decorated with @concurrent_async_db must be an async function")
async def concurrent(*args, **kwargs):
# Clean up or isolate DB connection state before calling the real function
# or something like this async_connections.refresh()
async_connections['default'] = None
return await func(*args, **kwargs)
@wraps(func)
def wrapper(*args, **kwargs):
# we use asyncio.create_task coz without it, the concurrent connection doesn't have any sense
return asyncio.create_task(concurrent(*args, **kwargs))
return wrapper
@concurrent_async_db
async def get_authors(pattern):
return [
author.name
async for author in Authors.objects.filter(name__icontains=pattern)
]
adv
- we don't have cases as, for
example #2
coz we don't create a connection explicit - we don't have access to the parent connections
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given this view:
async def my_view(request):
async with db.new_connections():
# Query authors and books concurrently
task_authors = asyncio.create_task(get_authors("an"))
task_books = asyncio.create_task(get_books("di"))
return render(
request,
"template.html",
{
"books": await task_books,
"authors": await task_authors,
},
)
I think the user should expect the two queries to happen sequentially. The code await
s for task_books
, then await
s again for task_authors
. That's why the keyword is called await
:) "Async" does not mean "in parallel".
We could also have a concurrent_async_db
, but I think it should be a later addition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, the current example works as expected. The problem is that with the existing requirement (I am talking about db.new_connections()), we generate a lot of problems (which I described above), and our end users have to keep in mind all of that to write code without side effects 😔
If our end goal is to give the ability to run queries in parallel, then we have other approaches that don't have the current problems
ps: I really really hate creating an explicit connection 😅
@@ -89,6 +92,8 @@ def _get_varchar_column(data): | |||
class DatabaseWrapper(BaseDatabaseWrapper): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like it would be better to implement a separate class for async.
Advantages:
- No mixed logic between sync and async connections
- Prevents creating a sync connection for an async wrapper and vice versa
- Allows us to keep using method names like execute, fetchone, etc., without adding an 'a' prefix
Disadvantages:
- We're relying on DatabaseWrapper in many places, as seen here. We need to be careful with it
example how we can implement asynchronous connection handler Arfey@6f13cc8
class AsyncConnectionHandler(BaseAsyncConnectionHandler):
...
def create_connection(self, alias):
db = self.settings[alias]
backend = load_backend(db["ENGINE"])
if not hasattr(backend, 'AsyncDatabaseWrapper'):
raise self.exception_class(
f"The async connection '{alias}' doesn't exist."
)
return backend.AsyncDatabaseWrapper(db, alias)
AsyncDatabaseWrapper
- separate database wrapper only for asynchronous operations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One issue that made me decide against separate classes is easy-of-use.
Consider that most users will be starting from an existing sync codebase, and they'd want to async just parts of it.
With two separate classes, they'd have to configure a separate backend - no big deal. But now the async features are on the non-defaul alias, so they'll have to pass the alias every time they need an async connnection.
Basically, they won't be able to call async with db.new_connections()
. They'll have to call async with db.new_connections("my_alias")
every time.
I am a bit conflicted. On one hand, I can see this as a case of 'explicit better than implicit'. OTOH, One of the key features of the Django ORM is this kind of implicit connection management.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider that most users will be starting from an existing sync codebase, and they’d want to async just parts of it. With two separate classes, they'd have to configure a separate backend - no big deal. But now the async features are on the non-defaul alias, so they'll have to pass the alias every time they need an async connnection.
You are right 😌 100% agree with you. Let’s consider the existing code base with async code
await User.objects.aget(id=user_id)
So, current user interacts with our “async” ORM over models. Inside model we can get alias + right connection without changes from the user side.
Current code
class
10000
QuerySet(AltersData):
def count(self):
if self._result_cache is not None:
return len(self._result_cache)
return self.query.get_count(using=self.db)
async def acount(self):
return await sync_to_async(self.count)()
self.db
is our alias
Inside our model's method, we need to do something like that
class RawQuery:
def _execute_query(self):
connection = connections[self.using]
...
async def _execute_query(self):
connection = async_connections[self.using]
...
Looks easy to me 🙂
With two separate classes, they'd have to configure a separate backend - no big deal.
I thought about it, and I think I found an acceptable solution.
DATABASES = {
"default": {
"ENGINE": "django.db.backends.postgresql",
# ...
},
}
Inside django.db.backends.postgresql We put 2 classes, DatabaseWrapper and AsyncDatabaseWrapper. Also, we have 2 connections, ConnectionHandler and AsyncConnectionHandler, for each of them, we return the appropriate wrapper
connections = ConnectionHandler()
async_connections = AsyncConnectionHandler()
connection = async_connections[DEFAULT_DB_ALIAS]
async with connection.cursor() as cursor:
await cursor.execute("""select 1""")
connection = connections[DEFAULT_DB_ALIAS]
with connection.cursor() as cursor:
cursor.execute("""select 1""")
It works (u can check it in my experimental branch). It follows the same style and works in the same way.
Does it make sense? 😊
Hey 👋 I’ve prepared a small POC where I tried to separate the database wrappers for the sync and async variants. The idea was to explore an alternative to the current approach, where both sync and async methods are duplicated inside a single class (e.g., using acommit alongside commit). While the current solution works, I feel it makes the code harder to follow. Splitting it into two separate implementations introduces some duplication, but keeps the interface cleaner and easier to understand. When you have a moment, please take a look and share your thoughts: 🔗 https://github.com/Arfey/django/pull/1/files Thanks 😌 |
I've thought about the code generation described in this article, and we already have a script that will work well for us. It's easy peasy. Just replace async and await with an empty string. But we have some consequences:
In our case in sync class we have a lot of code which we need to make migration, introspection and etc, and I've deleted it. also i dont understand how to deal with "not simple" async code like that, or async.gather That's why I've decided to prepare a version with full duplication (to avoid changing the existing sync code). As u can see, we don't have a lot of duplication if compared with the current MR. |
Hi @Arfey , I appreciate your review! that's a lot of feedback, so it's going to take me a while to get through it! |
Co-authored-by: Mykhailo Havelia <Arfey17.mg@gmail.com>
55a832b
to
e7727eb
Compare
I’ve moved the discussion about the explicit database connection to the Django forum to keep the code review thread more focused. 😊 Here’s the link to the forum post. |
Trac ticket number
ticket-35629
Branch description
Provide a concise overview of the issue or rationale behind the proposed changes.
Checklist
main
branch.