Drivers and Querying

SQLSpec provides a unified driver interface across supported databases. This page covers the common execution patterns and points to adapter-specific configuration.

Supported Drivers (High Level)

  • PostgreSQL: asyncpg, psycopg (sync/async), psqlpy, ADBC

  • SQLite: sqlite3, aiosqlite, ADBC

  • MySQL: asyncmy, mysql-connector, pymysql

  • Analytics / Cloud: DuckDB, BigQuery, Spanner, Oracle, ADBC

Core Execution Pattern

sqlite session
from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig

db_path = tmp_path / "driver.sqlite"
spec = SQLSpec()
config = spec.add_config(SqliteConfig(connection_config={"database": str(db_path)}))

with spec.provide_session(config) as session:
    session.execute("create table if not exists health (id integer primary key)")
    result = session.execute("select count(*) as total from health")
    print(result.one())

Transactions

manual transaction
from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig

db_path = tmp_path / "transactions.db"
spec = SQLSpec()
config = spec.add_config(SqliteConfig(connection_config={"database": str(db_path)}))

with spec.provide_session(config) as session:
    session.execute("create table if not exists ledger (id integer primary key, note text)")
    session.begin()
    session.execute("insert into ledger (note) values ('committed')")
    session.commit()

    session.begin()
    session.execute("insert into ledger (note) values ('rolled back')")
    session.rollback()

    result = session.execute("select note from ledger order by id")
    print(result.all())

Parameter Binding

positional and named parameters
from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig

db_path = tmp_path / "params.db"
spec = SQLSpec()
config = spec.add_config(SqliteConfig(connection_config={"database": str(db_path)}))

with spec.provide_session(config) as session:
    session.execute("select :status as status, :status as status_copy", {"status": "active"})
    result = session.execute("select ? as value", (42,))
    print(result.one())

Schema Mapping

Use the schema_type parameter on select(), select_one(), and select_one_or_none() to map result rows to dataclass instances automatically.

schema_type mapping
from dataclasses import dataclass

from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig

@dataclass
class User:
    id: int
    name: str

db_path = tmp_path / "schema.db"
spec = SQLSpec()
config = spec.add_config(SqliteConfig(connection_config={"database": str(db_path)}))

with spec.provide_session(config) as session:
    session.execute("create table users (id integer primary key, name text)")
    session.execute("insert into users (name) values ('Alice'), ('Bob')")

    # select returns list of dicts by default
    rows = session.select("select id, name from users order by id")
    print(rows)  # [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]

    # Use schema_type to map rows to dataclass instances
    users = session.select("select id, name from users order by id", schema_type=User)
    print(users)  # [User(id=1, name='Alice'), User(id=2, name='Bob')]

    # select_one with schema_type
    user = session.select_one("select id, name from users where name = ?", "Alice", schema_type=User)
    print(user)  # User(id=1, name='Alice')

    # select_one_or_none returns None if no match
    maybe_user = session.select_one_or_none("select id, name from users where name = ?", "Nobody", schema_type=User)
    print(maybe_user)  # None

Scalar Values and Pagination

select_value returns a single scalar from a one-row, one-column result. select_value_or_none returns None when no rows match. select_with_total returns both the data page and the total count for pagination.

scalar values, execute_many, and pagination
from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig

db_path = tmp_path / "batch.db"
spec = SQLSpec()
config = spec.add_config(SqliteConfig(connection_config={"database": str(db_path)}))

with spec.provide_session(config) as session:
    session.execute("create table users (id integer primary key, name text, email text)")

    # execute_many inserts multiple rows in a single call
    session.execute_many(
        "insert into users (name, email) values (?, ?)",
        [("Alice", "[email protected]"), ("Bob", "[email protected]"), ("Charlie", "[email protected]")],
    )

    # select_value returns a single scalar value
    count = session.select_value("select count(*) from users")
    print(count)  # 3

    # select_value with type conversion
    count_int = session.select_value("select count(*) from users", value_type=int)
    print(count_int)  # 3

    # select_value_or_none returns None when no rows match
    email = session.select_value_or_none("select email from users where name = ?", "Nobody")
    print(email)  # None

    # select_with_total for pagination
    from sqlspec.core import SQL

    query = SQL("select id, name from users").paginate(page=1, page_size=2)
    data, total = session.select_with_total(query)
    print(f"Page has {len(data)} rows, total matching: {total}")

Statement Stacks

Statement stacks bundle multiple SQL statements plus parameter sets. Drivers that support native pipelines or batch execution can send the stack in a single round trip, while others execute each statement sequentially.

statement stack
from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig
from sqlspec.core.stack import StatementStack

spec = SQLSpec()
config = spec.add_config(SqliteConfig(connection_config={"database": ":memory:"}))

stack = (
    StatementStack()
    .push_execute("create table teams (id integer primary key, name text)")
    .push_execute_many("insert into teams (name) values (:name)", [{"name": "Litestar"}, {"name": "SQLSpec"}])
    .push_execute("select id, name from teams order by id")
)

with spec.provide_session(config) as session:
    results = session.execute_stack(stack)
    rows = results[-1].result.all()

Driver Configuration Examples

asyncpg config
from sqlspec.adapters.asyncpg import AsyncpgConfig

config = AsyncpgConfig(
    connection_config={"dsn": "postgresql://user:pass@localhost:5432/app"},
    pool_config={"min_size": 1, "max_size": 5},
)
cockroach + psycopg
from sqlspec.adapters.cockroach_psycopg import CockroachPsycopgSyncConfig

config = CockroachPsycopgSyncConfig(connection_config={"dsn": "postgresql://user:pass@localhost:26257/defaultdb"})
mysql connector config
from sqlspec.adapters.mysqlconnector import MysqlConnectorSyncConfig

config = MysqlConnectorSyncConfig(
    connection_config={"host": "localhost", "user": "app", "password": "secret", "database": "app_db"}
)