forked from litestar-org/sqlspec
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcli.py
More file actions
615 lines (512 loc) · 25 KB
/
cli.py
File metadata and controls
615 lines (512 loc) · 25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
# ruff: noqa: C901
import inspect
import sys
from collections.abc import Sequence
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast
import rich_click as click
from click.core import ParameterSource
if TYPE_CHECKING:
from rich_click import Group
from sqlspec.config import AsyncDatabaseConfig, SyncDatabaseConfig
from sqlspec.migrations.commands import AsyncMigrationCommands, SyncMigrationCommands
__all__ = ("add_migration_commands", "get_sqlspec_group")
def get_sqlspec_group() -> "Group":
"""Get the SQLSpec CLI group.
Returns:
The SQLSpec CLI group.
"""
@click.group(name="sqlspec")
@click.option(
"--config",
help="Dotted path to SQLSpec config(s) or callable function (e.g. 'myapp.config.get_configs')",
required=True,
type=str,
)
@click.option(
"--validate-config", is_flag=True, default=False, help="Validate configuration before executing migrations"
)
@click.pass_context
def sqlspec_group(ctx: "click.Context", config: str, validate_config: bool) -> None:
"""SQLSpec CLI commands."""
from rich import get_console
from sqlspec.exceptions import ConfigResolverError
from sqlspec.utils.config_resolver import resolve_config_sync
console = get_console()
ctx.ensure_object(dict)
# Add current working directory to sys.path to allow loading local config modules
cwd = str(Path.cwd())
cwd_added = False
if cwd not in sys.path:
sys.path.insert(0, cwd)
cwd_added = True
try:
config_result = resolve_config_sync(config)
if isinstance(config_result, Sequence) and not isinstance(config_result, str):
ctx.obj["configs"] = list(config_result)
else:
ctx.obj["configs"] = [config_result]
ctx.obj["validate_config"] = validate_config
if validate_config:
console.print(f"[green]✓[/] Successfully loaded {len(ctx.obj['configs'])} config(s)")
for i, cfg in enumerate(ctx.obj["configs"]):
config_name = cfg.bind_key or f"config-{i}"
config_type = type(cfg).__name__
is_async = cfg.is_async
execution_hint = "[dim cyan](async-capable)[/]" if is_async else "[dim](sync)[/]"
console.print(f" [dim]•[/] {config_name}: {config_type} {execution_hint}")
except (ImportError, ConfigResolverError) as e:
console.print(f"[red]Error loading config: {e}[/]")
ctx.exit(1)
finally:
# Clean up: remove the cwd from sys.path if we added it
if cwd_added and cwd in sys.path and sys.path[0] == cwd:
sys.path.remove(cwd)
return sqlspec_group
def _ensure_click_context() -> "click.Context":
"""Return the active Click context, raising if missing (for type-checkers)."""
context = click.get_current_context()
if context is None: # pragma: no cover - click guarantees context in commands
msg = "SQLSpec CLI commands require an active Click context"
raise RuntimeError(msg)
return cast("click.Context", context)
def add_migration_commands(database_group: "Group | None" = None) -> "Group":
"""Add migration commands to the database group.
Args:
database_group: The database group to add the commands to.
Returns:
The database group with the migration commands added.
"""
from rich import get_console
console = get_console()
if database_group is None:
database_group = get_sqlspec_group()
bind_key_option = click.option(
"--bind-key", help="Specify which SQLSpec config to use by bind key", type=str, default=None
)
verbose_option = click.option("--verbose", help="Enable verbose output.", type=bool, default=False, is_flag=True)
no_prompt_option = click.option(
"--no-prompt",
help="Do not prompt for confirmation before executing the command.",
type=bool,
default=False,
required=False,
show_default=True,
is_flag=True,
)
include_option = click.option(
"--include", multiple=True, help="Include only specific configurations (can be used multiple times)"
)
exclude_option = click.option(
"--exclude", multiple=True, help="Exclude specific configurations (can be used multiple times)"
)
dry_run_option = click.option(
"--dry-run", is_flag=True, default=False, help="Show what would be executed without making changes"
)
execution_mode_option = click.option(
"--execution-mode",
type=click.Choice(["auto", "sync", "async"]),
default="auto",
help="Force execution mode (auto-detects by default)",
)
no_auto_sync_option = click.option(
"--no-auto-sync",
is_flag=True,
default=False,
help="Disable automatic version reconciliation when migrations have been renamed",
)
def get_config_by_bind_key(
ctx: "click.Context", bind_key: str | None
) -> "AsyncDatabaseConfig[Any, Any, Any] | SyncDatabaseConfig[Any, Any, Any]":
"""Get the SQLSpec config for the specified bind key.
Args:
ctx: The click context.
bind_key: The bind key to get the config for.
Returns:
The SQLSpec config for the specified bind key.
"""
configs = ctx.obj["configs"]
if bind_key is None:
config = configs[0]
else:
config = None
for cfg in configs:
config_name = cfg.bind_key
if config_name == bind_key:
config = cfg
break
if config is None:
console.print(f"[red]No config found for bind key: {bind_key}[/]")
sys.exit(1)
return cast("AsyncDatabaseConfig[Any, Any, Any] | SyncDatabaseConfig[Any, Any, Any]", config)
def get_configs_with_migrations(ctx: "click.Context", enabled_only: bool = False) -> "list[tuple[str, Any]]":
"""Get all configurations that have migrations enabled.
Args:
ctx: The click context.
enabled_only: If True, only return configs with enabled=True.
Returns:
List of tuples (config_name, config) for configs with migrations enabled.
"""
configs = ctx.obj["configs"]
migration_configs = []
for config in configs:
migration_config = config.migration_config
if migration_config:
enabled = migration_config.get("enabled", True)
if not enabled_only or enabled:
config_name = config.bind_key or str(type(config).__name__)
migration_configs.append((config_name, config))
return migration_configs
def filter_configs(
configs: "list[tuple[str, Any]]", include: "tuple[str, ...]", exclude: "tuple[str, ...]"
) -> "list[tuple[str, Any]]":
"""Filter configuration list based on include/exclude criteria.
Args:
configs: List of (config_name, config) tuples.
include: Config names to include (empty means include all).
exclude: Config names to exclude.
Returns:
Filtered list of configurations.
"""
filtered = configs
if include:
filtered = [(name, config) for name, config in filtered if name in include]
if exclude:
filtered = [(name, config) for name, config in filtered if name not in exclude]
return filtered
async def maybe_await(result: Any) -> Any:
"""Await result if it's a coroutine, otherwise return it directly."""
if inspect.iscoroutine(result):
return await result
return result
def process_multiple_configs(
ctx: "click.Context",
bind_key: str | None,
include: "tuple[str, ...]",
exclude: "tuple[str, ...]",
dry_run: bool,
operation_name: str,
) -> "list[tuple[str, Any]] | None":
"""Process configuration selection for multi-config operations.
Args:
ctx: Click context.
bind_key: Specific bind key to target.
include: Config names to include.
exclude: Config names to exclude.
dry_run: Whether this is a dry run.
operation_name: Name of the operation for display.
Returns:
List of (config_name, config) tuples to process, or None for single config mode.
"""
# If specific bind_key requested, use single config mode
if bind_key and not include and not exclude:
return None
# Get enabled configs by default, all configs if include/exclude specified
enabled_only = not include and not exclude
migration_configs = get_configs_with_migrations(ctx, enabled_only=enabled_only)
# If only one config and no filtering, use single config mode
if len(migration_configs) <= 1 and not include and not exclude:
return None
# Apply filtering
configs_to_process = filter_configs(migration_configs, include, exclude)
if not configs_to_process:
console.print("[yellow]No configurations match the specified criteria.[/]")
return []
# Show what will be processed
if dry_run:
console.print(f"[blue]Dry run: Would {operation_name} {len(configs_to_process)} configuration(s)[/]")
for config_name, _ in configs_to_process:
console.print(f" • {config_name}")
return []
return configs_to_process
@database_group.command(name="show-current-revision", help="Shows the current revision for the database.")
@bind_key_option
@verbose_option
@include_option
@exclude_option
def show_database_revision( # pyright: ignore[reportUnusedFunction]
bind_key: str | None, verbose: bool, include: "tuple[str, ...]", exclude: "tuple[str, ...]"
) -> None:
"""Show current database revision."""
from sqlspec.migrations.commands import create_migration_commands
from sqlspec.utils.sync_tools import run_
ctx = _ensure_click_context()
async def _show_current_revision() -> None:
# Check if this is a multi-config operation
configs_to_process = process_multiple_configs(
ctx, bind_key, include, exclude, dry_run=False, operation_name="show current revision"
)
if configs_to_process is not None:
if not configs_to_process:
return
console.rule("[yellow]Listing current revisions for all configurations[/]", align="left")
for config_name, config in configs_to_process:
console.print(f"\n[blue]Configuration: {config_name}[/]")
try:
migration_commands: SyncMigrationCommands[Any] | AsyncMigrationCommands[Any] = (
create_migration_commands(config=config)
)
await maybe_await(migration_commands.current(verbose=verbose))
except Exception as e:
console.print(f"[red]✗ Failed to get current revision for {config_name}: {e}[/]")
else:
console.rule("[yellow]Listing current revision[/]", align="left")
sqlspec_config = get_config_by_bind_key(ctx, bind_key)
migration_commands = create_migration_commands(config=sqlspec_config)
await maybe_await(migration_commands.current(verbose=verbose))
run_(_show_current_revision)()
@database_group.command(name="downgrade", help="Downgrade database to a specific revision.")
@bind_key_option
@no_prompt_option
@include_option
@exclude_option
@dry_run_option
@click.argument("revision", type=str, default="-1")
def downgrade_database( # pyright: ignore[reportUnusedFunction]
bind_key: str | None,
revision: str,
no_prompt: bool,
include: "tuple[str, ...]",
exclude: "tuple[str, ...]",
dry_run: bool,
) -> None:
"""Downgrade the database to the latest revision.&qu
8000
ot;""
from rich.prompt import Confirm
from sqlspec.migrations.commands import create_migration_commands
from sqlspec.utils.sync_tools import run_
ctx = _ensure_click_context()
async def _downgrade_database() -> None:
# Check if this is a multi-config operation
configs_to_process = process_multiple_configs(
ctx, bind_key, include, exclude, dry_run=dry_run, operation_name=f"downgrade to {revision}"
)
if configs_to_process is not None:
if not configs_to_process:
return
if not no_prompt and not Confirm.ask(
f"[bold]Are you sure you want to downgrade {len(configs_to_process)} configuration(s) to revision {revision}?[/]"
):
console.print("[yellow]Operation cancelled.[/]")
return
console.rule("[yellow]Starting multi-configuration downgrade process[/]", align="left")
for config_name, config in configs_to_process:
console.print(f"[blue]Downgrading configuration: {config_name}[/]")
try:
migration_commands: SyncMigrationCommands[Any] | AsyncMigrationCommands[Any] = (
create_migration_commands(config=config)
)
await maybe_await(migration_commands.downgrade(revision=revision, dry_run=dry_run))
console.print(f"[green]✓ Successfully downgraded: {config_name}[/]")
except Exception as e:
console.print(f"[red]✗ Failed to downgrade {config_name}: {e}[/]")
else:
# Single config operation
console.rule("[yellow]Starting database downgrade process[/]", align="left")
input_confirmed = (
True
if no_prompt
else Confirm.ask(f"Are you sure you want to downgrade the database to the `{revision}` revision?")
)
if input_confirmed:
sqlspec_config = get_config_by_bind_key(ctx, bind_key)
migration_commands = create_migration_commands(config=sqlspec_config)
await maybe_await(migration_commands.downgrade(revision=revision, dry_run=dry_run))
run_(_downgrade_database)()
@database_group.command(name="upgrade", help="Upgrade database to a specific revision.")
@bind_key_option
@no_prompt_option
@include_option
@exclude_option
@dry_run_option
@execution_mode_option
@no_auto_sync_option
@click.argument("revision", type=str, default="head")
def upgrade_database( # pyright: ignore[reportUnusedFunction]
bind_key: str | None,
revision: str,
no_prompt: bool,
include: "tuple[str, ...]",
exclude: "tuple[str, ...]",
dry_run: bool,
execution_mode: str,
no_auto_sync: bool,
) -> None:
"""Upgrade the database to the latest revision."""
from rich.prompt import Confirm
from sqlspec.migrations.commands import create_migration_commands
from sqlspec.utils.sync_tools import run_
ctx = _ensure_click_context()
async def _upgrade_database() -> None:
# Report execution mode when specified
if execution_mode != "auto":
console.print(f"[dim]Execution mode: {execution_mode}[/]")
# Check if this is a multi-config operation
configs_to_process = process_multiple_configs(
ctx, bind_key, include, exclude, dry_run, operation_name=f"upgrade to {revision}"
)
if configs_to_process is not None:
if not configs_to_process:
return
if not no_prompt and not Confirm.ask(
f"[bold]Are you sure you want to upgrade {len(configs_to_process)} configuration(s) to revision {revision}?[/]"
):
console.print("[yellow]Operation cancelled.[/]")
return
console.rule("[yellow]Starting multi-configuration upgrade process[/]", align="left")
for config_name, config in configs_to_process:
console.print(f"[blue]Upgrading configuration: {config_name}[/]")
try:
migration_commands: SyncMigrationCommands[Any] | AsyncMigrationCommands[Any] = (
create_migration_commands(config=config)
)
await maybe_await(
migration_commands.upgrade(revision=revision, auto_sync=not no_auto_sync, dry_run=dry_run)
)
console.print(f"[green]✓ Successfully upgraded: {config_name}[/]")
except Exception as e:
console.print(f"[red]✗ Failed to upgrade {config_name}: {e}[/]")
else:
# Single config operation
console.rule("[yellow]Starting database upgrade process[/]", align="left")
input_confirmed = (
True
if no_prompt
else Confirm.ask(
f"[bold]Are you sure you want migrate the database to the `{revision}` revision?[/]"
)
)
if input_confirmed:
sqlspec_config = get_config_by_bind_key(ctx, bind_key)
migration_commands = create_migration_commands(config=sqlspec_config)
await maybe_await(
migration_commands.upgrade(revision=revision, auto_sync=not no_auto_sync, dry_run=dry_run)
)
run_(_upgrade_database)()
@database_group.command(help="Stamp the revision table with the given revision")
@click.argument("revision", type=str)
@bind_key_option
def stamp(bind_key: str | None, revision: str) -> None: # pyright: ignore[reportUnusedFunction]
"""Stamp the revision table with the given revision."""
from sqlspec.migrations.commands import create_migration_commands
from sqlspec.utils.sync_tools import run_
ctx = _ensure_click_context()
async def _stamp() -> None:
sqlspec_config = get_config_by_bind_key(ctx, bind_key)
migration_commands = create_migration_commands(config=sqlspec_config)
await maybe_await(migration_commands.stamp(revision=revision))
run_(_stamp)()
@database_group.command(name="init", help="Initialize migrations for the project.")
@bind_key_option
@click.argument("directory", default=None, required=False)
@click.option("--package", is_flag=True, default=True, help="Create `__init__.py` for created folder")
@no_prompt_option
def init_sqlspec( # pyright: ignore[reportUnusedFunction]
bind_key: str | None, directory: str | None, package: bool, no_prompt: bool
) -> None:
"""Initialize the database migrations."""
from rich.prompt import Confirm
from sqlspec.migrations.commands import create_migration_commands
from sqlspec.utils.sync_tools import run_
ctx = _ensure_click_context()
async def _init_sqlspec() -> None:
console.rule("[yellow]Initializing database migrations.", align="left")
input_confirmed = (
True
if no_prompt
else Confirm.ask("[bold]Are you sure you want initialize migrations for the project?[/]")
)
if input_confirmed:
configs = [get_config_by_bind_key(ctx, bind_key)] if bind_key is not None else ctx.obj["configs"]
for config in configs:
migration_config = getattr(config, "migration_config", {})
target_directory = (
str(migration_config.get("script_location", "migrations")) if directory is None else directory
)
migration_commands = create_migration_commands(config=config)
await maybe_await(migration_commands.init(directory=target_directory, package=package))
run_(_init_sqlspec)()
@database_group.command(
name="create-migration", aliases=["make-migration"], help="Create a new migration revision."
)
@bind_key_option
@click.option("-m", "--message", default=None, help="Revision message")
@click.option(
"--format",
"--file-type",
"file_format",
type=click.Choice(["sql", "py"]),
default=None,
help="File format for the generated migration (defaults to template profile)",
)
@no_prompt_option
def create_revision( # pyright: ignore[reportUnusedFunction]
bind_key: str | None, message: str | None, file_format: str | None, no_prompt: bool
) -> None:
"""Create a new database revision."""
from rich.prompt import Prompt
from sqlspec.migrations.commands import create_migration_commands
from sqlspec.utils.sync_tools import run_
ctx = _ensure_click_context()
async def _create_revision() -> None:
console.rule("[yellow]Creating new migration revision[/]", align="left")
message_text = message
if message_text is None:
message_text = (
"new migration" if no_prompt else Prompt.ask("Please enter a message describing this revision")
)
sqlspec_config = get_config_by_bind_key(ctx, bind_key)
param_source = ctx.get_parameter_source("file_format")
effective_format = None if param_source is ParameterSource.DEFAULT else file_format
migration_commands = create_migration_commands(config=sqlspec_config)
await maybe_await(migration_commands.revision(message=message_text, file_type=effective_format))
run_(_create_revision)()
@database_group.command(name="fix", help="Convert timestamp migrations to sequential format.")
@bind_key_option
@dry_run_option
@click.option("--yes", is_flag=True, help="Skip confirmation prompt")
@click.option("--no-database", is_flag=True, help="Skip database record updates")
def fix_migrations( # pyright: ignore[reportUnusedFunction]
bind_key: str | None, dry_run: bool, yes: bool, no_database: bool
) -> None:
"""Convert timestamp migrations to sequential format."""
from sqlspec.migrations.commands import create_migration_commands
from sqlspec.utils.sync_tools import run_
ctx = _ensure_click_context()
async def _fix_migrations() -> None:
console.rule("[yellow]Migration Fix Command[/]", align="left")
sqlspec_config = get_config_by_bind_key(ctx, bind_key)
migration_commands = create_migration_commands(config=sqlspec_config)
await maybe_await(migration_commands.fix(dry_run=dry_run, update_database=not no_database, yes=yes))
run_(_fix_migrations)()
@database_group.command(name="show-config", help="Show all configurations with migrations enabled.")
@bind_key_option
def show_config(bind_key: str | None = None) -> None: # pyright: ignore[reportUnusedFunction]
"""Show and display all configurations with migrations enabled."""
from rich.table import Table
ctx = _ensure_click_context()
# If bind_key is provided, filter to only that config
if bind_key is not None:
get_config_by_bind_key(ctx, bind_key)
# Convert single config to list format for compatibility
all_configs = ctx.obj["configs"]
migration_configs = []
for cfg in all_configs:
config_name = cfg.bind_key
if config_name == bind_key and hasattr(cfg, "migration_config") and cfg.migration_config:
migration_configs.append((config_name, cfg))
else:
migration_configs = get_configs_with_migrations(ctx)
if not migration_configs:
console.print("[yellow]No configurations with migrations detected.[/]")
return
table = Table(title="Migration Configurations")
table.add_column("Configuration Name", style="cyan")
table.add_column("Migration Path", style="blue")
table.add_column("Status", style="green")
for config_name, config in migration_configs:
migration_config = getattr(config, "migration_config", {})
script_location = migration_config.get("script_location", "migrations")
table.add_row(config_name, str(script_location), "Migration Enabled")
console.print(table)
console.print(f"[blue]Found {len(migration_configs)} configuration(s) with migrations enabled.[/]")
return database_group