8000 Fixed #35629 -- Added support for async database connections and cursors. by fcurella · Pull Request #18408 · django/django · GitHub
[go: up one dir, main page]

Skip to content

Fixed #35629 -- Added support for async database connections and cursors. #18408

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

fcurella
Copy link
Contributor

Trac ticket number

ticket-35629

Branch description

Provide a concise overview of the issue or rationale behind the proposed changes.

Checklist

  • This PR targets the main branch.
  • The commit message is written in past tense, mentions the ticket number, and ends with a period.
  • I have checked the "Has patch" ticket flag in the Trac system.
  • I have added or updated relevant tests.
  • I have added or updated relevant docs, including release notes if applicable.
  • I have attached screenshots in both light and dark modes for any UI changes.

@fcurella fcurella changed the title Fixed #35629. Async DB Connection and Cursor Draft: Fixed #35629. Async DB Connection and Cursor Jul 24, 2024
@fcurella fcurella force-pushed the async-orm-cursor branch 6 times, most recently from 3a3fc57 to e6bbcd1 Compare July 25, 2024 20:03
@fcurella fcurella changed the title Draft: Fixed #35629. Async DB Connection and Cursor Fixed #35629. Async DB Connection and Cursor Jul 25, 2024
@fcurella fcurella force-pushed the async-orm-cursor branch 5 times, most recently from 1b81214 to b42cc45 Compare August 9, 2024 13:47
@carltongibson
Copy link
Member

@fcurella Thanks for pushing this! I'm going to have a potter on it, and I'll see if @felixxm has a cycle too.

First glance it looks very minimal. 👍

Copy link
Contributor
@bigfootjon bigfootjon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sufficiently familiar with the internals of the ORM/database layer to comment on much of the internal logic here, but the general shape of this code and approach seems to make sense to me

(left a few questions, nothing terribly blocking)

ServerCursor implements the logic required to declare and scroll
through named cursors.

Mixing ClientCursorMixin in wouldn't be necessary if Cursor allowed to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps out of scope for this PR, but I don't think it's clear where this Cursor comes from. I think it's defined in psycopg3, but I'm not confident in that.

This should also be clarified in the ServerSideCursor docstring above too.

Copy link
Contributor Author
@fcurella fcurella Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anything under Database. comes from psycopg2 or psycopg3.

In this specific case, ClientCursonMixin and AsyncClientCursor come from psycopg3

ServerSideCursor Is our custom class that derives from postgres' ClientCursorMixin and AsyncServerCursor/ServerCursor

stop = time.monotonic()
duration = stop - start
< 8000 span class="blob-code-inner blob-code-marker-addition"> if use_last_executed_query:
sql = self.db.ops.last_executed_query(self.cursor, sql, params)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might this need to be async?

Copy link
Contributor Author
@fcurella fcurella Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've checked across all backends, and last_executed_query is a non-blocking method that basically only does string interpolation / decoding of a string that's already stored on the DBAPI cursor.

@fcurella fcurella force-pushed the async-orm-cursor branch 2 times, most recently from 259d10c to 53fae35 Compare October 22, 2024 15:26
@fcurella fcurella force-pushed the async-orm-cursor branch 3 times, most recently from efafc58 to 49141bb Compare October 24, 2024 14:24

async with new_connection() as connection:
async with connection.acursor() as c:
await c.aexecute(...)
Copy link
@Arfey Arfey Apr 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a question directly for MR it's about general architecture decisions from the DEP 0009 🙂


Why global connection states are fantastic, and why explicitly creating a connection is bad default behavior

1. Drilling arguments

When u open an old aiohttp application with sqlalchemy-core u can find code like this

async def select_messages_by_room_id(session, room_id: int) -> list[Message]:
    cursor = await session.scalars(
        select(Message).where(Message.room_id == room_id).order_by(Message.id)
    )

    return cursor.all()

looks easy we have a utility function that receives a session/connection as an argument. but when we use nested functions, it becomes painful

async def end_function_with_required(conn):
    async with conn() as c:
        ...

async def foo(conn):
    await bar(conn)
    
    async with conn() as c:
        ...

async def view(request):
    async with new_connection() as conn:
        await foo(conn)

that's why people use context var to share a global connection

@asynccontextmanager
async def acquire() -> t.AsyncIterator[SAConnection]:
    current_conn = _db_conn.get(None)
    if current_conn:
        yield current_conn
    else:
        async with resources.db.acquire() as conn:
            token = _db_conn.set(conn)
            try:
                yield conn
            finally:
                _db_conn.reset(token)


async def execute(query: QueryType) -> ResultProxy:
    async with acquire() as conn:
        return await conn.execute(query)


@asynccontextmanager
async def transaction() -> t.AsyncIterator[Transaction]:
    async with acquire() as conn, conn.begin_nested() as tr:
        _db_tr.set(tr)
        yield tr

usage

async def create_notification():
    return await execute('INSERT ...')

async def create_notification():
    nf = await create_notification()
    await rest_client_call(nf.id)

async def view(request):
    async with transaction():
        await create_some(...)
        await send_notification()

I am not sure that examples show this hell related to connection as a parameter but second aproach is mutch mich better (and this is actually default django behavior)

U can tell me. Hold on. Instead of putting a connection as a parameter, u can create separate connections in each function

  • for transactions, I can't
  • Creating new connections has consequences. consistency and cost

2. Consistency

Simple example with GraphQL. We have a list of users

{
   user {
       id
       name
   }
}

and 2 query

  • fetch paginated list of user ids
  • dataloader that fetch detail info about user by ids

this is a get query, so for that, we can use replicas. we have 1, 5, and 10 replicas no matter but each of them has replica lag, and it is different for each of them

we select a list of users from the master as an example. In this response, we have a fresh user. After that, we selected detailed info from another replica (due to the load balancer) and got an error coz this user doesn't exist due to replica lag

3. Cost

The new connection is not free.

  • take time
  • use db resources

yes, u can get an advantage when u do call in parallel

async def my_view(request):
    # Query authors and books concurrently
    task_authors = asyncio.create_task(get_authors("an"))
    task_books = asyncio.create_task(get_books("di"))
    return render(
        request,
        "template.html",
        {
            "books": await task_books,
            "authors": await task_authors,
        },
    )

but when u need something like that

book = await get_book('1')
author_reviews = await get_reviews(book.author_id)

or like this

book = await create_book(...)
return await books_count()

it will take time

about the db server. for Postgres 1k connections, it's not easy if you earn max connection or use all CPU, your server goes down

with this approach, it's easy to lose control over that. the same example with GraphQL, each resolver is a call to the database as a result of connection to the database. u can't control how many fields your client will ask, and it becomes very hard to manage the load coz u don't have a correlation between PRC and load

that's why one connection per request is a good idea.


it's good to have a feature like this, but in my opinion, users have to know what they do, by design, the approach will push use it as is without understanding. it's like with PK as big as default, it's probably not the best option but it solves a lot of problems. the same thing with connection. it's good to have the ability to create few connections per request but by default 1 connection per 1 request is a better choice.

Alternative

By default

from django.db import connections

async with connections["default"].cursor() as cursor:
    # Your code here
    ...

as option

from django.db import connections

async with connections["default"].new() as conn:
    async with conn.cursor() as cursor
        # Your code here
        ...

any ideas?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more thought

I see that in DEP 0009, we use an implicit connection for the ORM.

async def get_authors(pattern):
    # Create a new context to call concurrently
    async with db.new_connections():
        return [
            author.name
            async for author in Authors.objects.filter(name__icontains=pattern)
        ]

it looks like we can keep interfaces for sync and async connections the same

from django.db import async_connections


async with async_connections["default"] as conn:
    async with conn.acursor() as cursor:
        # Your code here ...

for that, we need to do something like that

class AsyncConnectionHandler:

    def __getitem__(self, alias):
        async_alias = getattr(self._aliases, alias, None)

        if async_alias:
            return async_alias.get_connection() 

        async_alias = AsyncAlias()
        setattr(self._aliases, alias, async_alias)

        conn = self.create_connection(alias)
        async_alias.add_connection(conn)

        return conn

if u need to do optimization, u can easily create a new connection like this

async def get_authors(pattern):
    async with async_connections.new_connection("default"):
        return [
            author.name
            async for author in Authors.objects.filter(name__icontains=pattern)
        ]

advantages

  • lazy/implicit connection creating
  • the same interface for ConnectionHandler + connection
  • bind new_connection to asynchronous connections (code structure)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the motivations behind your idea. But I'm unclear on how (or if) your proposal differs with DEP0009. Could you clarify?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

creating an explicit connection. It is mainly different. as for me the implicit approach (with we use for sync connections) is much much better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that explicit connection creation is at the core of DEP009.

If you think DEP009 needs to be amended, you should probably start a discussion on the djangoproject forum. But for now this PR is about just implementing DEP009 as accepted.

< 23DA /div>

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as I said at the top of the current thread

< F438 blockquote>

This is not a question directly for MR, it's about general architecture decisions from the DEP 0009 🙂

But for me, it's important to leave comments here to show a pool of problems related to this solution before the merge (coz after the merge, it will be hard to change due to users who already use it as is).

I'll follow your suggestion and create a discussion on the Django forum to shift the conversation 😌 thx

@Arfey
Copy link
Arfey commented Apr 12, 2025

we can use something like this https://github.com/cbornet/blockbuster with tests to be sure that we don't have any blocking operations

as option

setattr(self._connections, "_stack", conns)


class AsyncConnectionHandler:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are a few thoughts about that

  • Django has BaseConnectionHandler, which provides a base class for managing connections (cache/database etc). We can follow this way and implement BaseAsyncConnectionHandler and reuse it for cache connection as well
  • I think keeping the same interface for connection and asynchronous connection is important. It increases the amount of code that we can reuse and, as a result, decrease code duplication. For these purposes, we can organize all logic related to the stack into a StackLocal instance

example of the implementation - Arfey@be7f02c

what do u think?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fcurella if it looks good for u I can help and prepare MR for your branch

class AsyncConnectionHandler:
"""
Context-aware class to store async connections, mapped by alias name.
"""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This solution perfectly covers existing requirements. But besides that, we have side effects. I'll try to explain below

example #1

async def get_authors(pattern):
    # Create a new context to call concurrently
    async with db.new_connections():
        return [
            author.name
            async for author in Authors.objects.filter(name__icontains=pattern)
        ]

async def get_books(pattern):
    # Create a new context to call concurrently
    async with db.new_connections():
        return [
            book.title
            async for book in Book.objects.filter(name__icontains=pattern)
        ]

async def my_view(request):
    # Query authors and books concurrently
    task_authors = asyncio.create_task(get_authors("an"))
    task_books = asyncio.create_task(get_books("di"))
    return render(
        request,
        "template.html",
        {
            "books": await task_books,
            "authors": await task_authors,
        },
    )

it will work perfectly.

example #2

async def my_view(request):
    task_authors = get_authors("an")
    task_books = get_books("di")
    return render(
        request,
        "template.html",
        {
            "books": await task_books,
            "authors": await task_authors,
        },
    )

it still works but looks odd coz we don't have any reason to create a new connection for each sequential request. Anyway, we can do it.

example #3

async def get_authors(pattern):
    # without new connection
    return [
        author.name
        async for author in Authors.objects.filter(name__icontains=pattern)
    ]

async def get_books(pattern):
    # create new connection and put it on stack
    async with db.new_connections():
        return [
            book.title
            async for book in Book.objects.filter(name__icontains=pattern)
        ]
        # delete connection from the stack

    # delete parent connection from the stack
    await async_connections['default'].close()

async def my_view(request):
    async with db.new_connections():
        # Query authors and books concurrently
        task_authors = asyncio.create_task(get_authors("an"))
        task_books = asyncio.create_task(get_books("di"))

        return render(
            request,
            "template.html",
            {
                "books": await task_books,
                "authors": await task_authors,
            },
        )

so I have full access to the parent connections (connection stack) and can do everything what I want with it.

I am not a big fan of db.new_connections 😂 so about my "very important" view

get rid of db.new_connections and use decorator instead

def concurrent_async_db(func):
    if not inspect.iscoroutinefunction(func):
        raise TypeError("Function decorated with @concurrent_async_db must be an async function")

    async def concurrent(*args, **kwargs):
        # Clean up or isolate DB connection state before calling the real function
        # or something like this async_connections.refresh()
        async_connections['default'] = None
        return await func(*args, **kwargs)

    @wraps(func)
    def wrapper(*args, **kwargs):
        # we use asyncio.create_task coz without it, the concurrent connection doesn't have any sense
        return asyncio.create_task(concurrent(*args, **kwargs))

    return wrapper

@concurrent_async_db
async def get_authors(pattern):
    return [
        author.name
        async for author in Authors.objects.filter(name__icontains=pattern)
    ]

adv

  • we don't have cases as, for example #2 coz we don't create a connection explicit
  • we don't have access to the parent connections

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this view:

async def my_view(request):
    async with db.new_connections():
        # Query authors and books concurrently
        task_authors = asyncio.create_task(get_authors("an"))
        task_books = asyncio.create_task(get_books("di"))

        return render(
            request,
            "template.html",
            {
                "books": await task_books,
                "authors": await task_authors,
            },
        )

I think the user should expect the two queries to happen sequentially. The code awaits for task_books, then awaits again for task_authors. That's why the keyword is called await :) "Async" does not mean "in parallel".

We could also have a concurrent_async_db, but I think it should be a later addition.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, the current example works as expected. The problem is that with the existing requirement (I am talking about db.new_connections()), we generate a lot of problems (which I described above), and our end users have to keep in mind all of that to write code without side effects 😔

If our end goal is to give the ability to run queries in parallel, then we have other approaches that don't have the current problems

ps: I really really hate creating an explicit connection 😅

@@ -89,6 +92,8 @@ def _get_varchar_column(data):
class DatabaseWrapper(BaseDatabaseWrapper):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like it would be better to implement a separate class for async.

Advantages:

  • No mixed logic between sync and async connections
  • Prevents creating a sync connection for an async wrapper and vice versa
  • Allows us to keep using method names like execute, fetchone, etc., without adding an 'a' prefix

Disadvantages:

  • We're relying on DatabaseWrapper in many places, as seen here. We need to be careful with it

example how we can implement asynchronous connection handler Arfey@6f13cc8

class AsyncConnectionHandler(BaseAsyncConnectionHandler):
  
    ...

    def create_connection(self, alias):
         db = self.settings[alias]
         backend = load_backend(db["ENGINE"])

         if not hasattr(backend, 'AsyncDatabaseWrapper'):
             raise self.exception_class(
                 f"The async connection '{alias}' doesn't exist."
             )

         return backend.AsyncDatabaseWrapper(db, alias)

AsyncDatabaseWrapper - separate database wrapper only for asynchronous operations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue that made me decide against separate classes is easy-of-use.

Consider that most users will be starting from an existing sync codebase, and they'd want to async just parts of it.

With two separate classes, they'd have to configure a separate backend - no big deal. But now the async features are on the non-defaul alias, so they'll have to pass the alias every time they need an async connnection.

Basically, they won't be able to call async with db.new_connections(). They'll have to call async with db.new_connections("my_alias") every time.

I am a bit conflicted. On one hand, I can see this as a case of 'explicit better than implicit'. OTOH, One of the key features of the Django ORM is this kind of implicit connection management.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider that most users will be starting from an existing sync codebase, and they’d want to async just parts of it. With two separate classes, they'd have to configure a separate backend - no big deal. But now the async features are on the non-defaul alias, so they'll have to pass the alias every time they need an async connnection.

You are right 😌 100% agree with you. Let’s consider the existing code base with async code

await User.objects.aget(id=user_id)

So, current user interacts with our “async” ORM over models. Inside model we can get alias + right connection without changes from the user side.

Current code

class 
10000
QuerySet(AltersData):
    def count(self):
        if self._result_cache is not None:
            return len(self._result_cache)

        return self.query.get_count(using=self.db)

    async def acount(self):
        return await sync_to_async(self.count)()

self.db is our alias

Inside our model's method, we need to do something like that

class RawQuery:

    def _execute_query(self):
        connection = connections[self.using]
        ...

    async def _execute_query(self):
        connection = async_connections[self.using]
        ...

Looks easy to me 🙂

With two separate classes, they'd have to configure a separate backend - no big deal.

I thought about it, and I think I found an acceptable solution.

DATABASES = {
    "default": {
        "ENGINE": "django.db.backends.postgresql",
        # ...
    },
}

Inside django.db.backends.postgresql We put 2 classes, DatabaseWrapper and AsyncDatabaseWrapper. Also, we have 2 connections, ConnectionHandler and AsyncConnectionHandler, for each of them, we return the appropriate wrapper

connections = ConnectionHandler()
async_connections = AsyncConnectionHandler()
connection = async_connections[DEFAULT_DB_ALIAS]

async with connection.cursor() as cursor:
    await cursor.execute("""select 1""")
connection = connections[DEFAULT_DB_ALIAS]

with connection.cursor() as cursor:
    cursor.execute("""select 1""")

It works (u can check it in my experimental branch). It follows the same style and works in the same way.

Does it make sense? 😊

@Arfey
Copy link
Arfey commented Apr 22, 2025

@fcurella @carltongibson

Hey 👋

I’ve prepared a small POC where I tried to separate the database wrappers for the sync and async variants.

The idea was to explore an alternative to the current approach, where both sync and async methods are duplicated inside a single class (e.g., using acommit alongside commit). While the current solution works, I feel it makes the code harder to follow. Splitting it into two separate implementations introduces some duplication, but keeps the interface cleaner and easier to understand.

When you have a moment, please take a look and share your thoughts:

🔗 https://github.com/Arfey/django/pull/1/files

Thanks 😌

ps: @fcurella It can be interesting for u 😀

@fcurella
Copy link
Contributor Author
fcurella commented Apr 22, 2025

@Arfey I think more duplication is acceptable IF we write in a such a way that we can use codegen to maintain it. @rtpg can you look at @Arfey 's code and see if codegen could work with that?

@Arfey
Copy link
Arfey commented Apr 22, 2025

@Arfey I think more duplication is acceptable IF we write in a such a way that we can use codegen to maintain it. @rtpg can you look at @Arfey 's code and see if codegen could work with that?

I've thought about the code generation described in this article, and we already have a script that will work well for us. It's easy peasy. Just replace async and await with an empty string.

But we have some consequences:

  • We need to write async code first and generate sync code from it
  • We need to have a mirror implementation for sync and asynchronous versions

In our case in sync class we have a lot of code which we need to make migration, introspection and etc, and I've deleted it.

also i dont understand how to deal with "not simple" async code like that, or async.gather

That's why I've decided to prepare a version with full duplication (to avoid changing the existing sync code). As u can see, we don't have a lot of duplication if compared with the current MR.

@fcurella
Copy link
Contributor Author
fcurella commented May 5, 2025

Hi @Arfey ,

I appreciate your review! that's a lot of feedback, so it's going to take me a while to get through it!

@fcurella fcurella force-pushed the async-orm-cursor branch 5 times, most recently from 55a832b to e7727eb Compare May 5, 2025 18:54
@fcurella fcurella force-pushed the async-orm-cursor branch from e7727eb to 126f15a Compare May 5, 2025 19:02
@Arfey
Copy link
Arfey commented May 8, 2025

I’ve moved the discussion about the explicit database connection to the Django forum to keep the code review thread more focused. 😊

Here’s the link to the forum post.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants
0