E5E7 Move core schema generation logic for path types inside the `GenerateSchema` class by sydney-runkle · Pull Request #10846 · pydantic/pydantic · GitHub
[go: up one dir, main page]

Skip to content
57 changes: 53 additions & 4 deletions pydantic/_internal/_generate_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,54 @@ def ser_ip(ip: Any, info: core_schema.SerializationInfo) -> str | IpType:
},
)

def _path_schema(self, tp: Any, path_type: Any) -> CoreSchema:
if tp is os.PathLike and path_type not in {str, bytes, Any}:
# TODO: maybe custom code here?
raise PydanticUserError(
'`os.PathLike` can only be used with `str`, `bytes` or `Any`', code='schema-for-unknown-type'
)

path_constructor = pathlib.PurePath if tp is os.PathLike else tp
constrained_schema = core_schema.bytes_schema() if (path_type is bytes) else core_schema.str_schema()

def path_validator(input_value: str | bytes) -> os.PathLike[Any]: # type: ignore
try:
if path_type is bytes:
if isinstance(input_value, bytes):
try:
input_value = input_value.decode()
except UnicodeDecodeError as e:
raise PydanticCustomError('bytes_type', 'Input must be valid bytes') from e
else:
raise PydanticCustomError('bytes_type', 'Input must be bytes')
elif not isinstance(input_value, str):
raise PydanticCustomError('path_type', 'Input is not a valid path')

return path_constructor(input_value) # type: ignore
except TypeError as e:
raise PydanticCustomError('path_type', 'Input is not a valid path') from e

instance_schema = core_schema.json_or_python_schema(
json_schema=core_schema.no_info_after_validator_function(path_validator, constrained_schema),
python_schema=core_schema.is_instance_schema(tp),
)

schema = core_schema.lax_or_strict_schema(
lax_schema=core_schema.union_schema(
[
instance_schema,
core_schema.no_info_after_validator_function(path_validator, constrained_schema),
],
custom_error_type='path_type',
custom_error_message=f'Input is not a valid path for {tp}',
strict=True,
),
strict_schema=instance_schema,
serialization=core_schema.to_string_ser_schema(),
metadata={'pydantic_js_functions': [lambda source, handler: {**handler(source), 'format': 'path'}]},
)
return schema

def _fraction_schema(self) -> CoreSchema:
"""Support for [`fractions.Fraction`][fractions.Fraction]."""
from ._validators import fraction_validator
Expand Down Expand Up @@ -946,6 +994,8 @@ def match_type(self, obj: Any) -> core_schema.CoreSchema: # noqa: C901
return self._sequence_schema(Any)
elif obj in DICT_TYPES:
return self._dict_schema(Any, Any)
elif obj in PATH_TYPES:
return self._path_schema(obj, Any)
elif _typing_extra.is_type_alias_type(obj):
return self._type_alias_type_schema(obj)
elif obj is type:
Expand Down Expand Up @@ -1024,6 +1074,8 @@ def _match_generic_type(self, obj: Any, origin: Any) -> CoreSchema: # noqa: C90
return self._frozenset_schema(self._get_first_arg_or_any(obj))
elif origin in DICT_TYPES:
return self._dict_schema(*self._get_first_two_args_or_any(obj))
elif origin in PATH_TYPES:
return self._path_schema(origin, self._get_first_arg_or_any(obj))
elif is_typeddict(origin):
return self._typed_dict_schema(obj, origin)
elif origin in (typing.Type, type):
Expand Down Expand Up @@ -1988,7 +2040,6 @@ def _get_prepare_pydantic_annotations_for_known_type(
from ._std_types_schema import (
deque_schema_prepare_pydantic_annotations,
mapping_like_prepare_pydantic_annotations,
path_schema_prepare_pydantic_annotations,
)

# Check for hashability
Expand All @@ -2003,9 +2054,7 @@ def _get_prepare_pydantic_annotations_for_known_type(
# not always called from match_type, but sometimes from _apply_annotations
obj_origin = get_origin(obj) or obj

if obj_origin in PATH_TYPES:
return path_schema_prepare_pydantic_annotations(obj, annotations)
elif obj_origin in DEQUE_TYPES:
if obj_origin in DEQUE_TYPES:
return deque_schema_prepare_pydantic_annotations(obj, annotations)
elif obj_origin in MAPPING_TYPES:
return mapping_like_prepare_pydantic_annotations(obj, annotations)
Expand Down
110 changes: 1 addition & 109 deletions pydantic/_internal/_std_types_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,24 @@
import collections
import collections.abc
import dataclasses
import os
import typing
from functools import partial
from typing import Any, Callable, Iterable, Tuple, TypeVar, cast

import typing_extensions
from pydantic_core import (
CoreSchema,
PydanticCustomError,
core_schema,
)
from typing_extensions import get_args, get_origin

from pydantic._internal._serializers import serialize_sequence_via_list
from pydantic.errors import PydanticSchemaGenerationError
from pydantic.types import Strict

from ..json_schema import JsonSchemaValue
from . import _known_annotated_metadata, _typing_extra
from ._import_utils import import_cached_field_info
from ._internal_dataclass import slots_true
from ._schema_generation_shared import GetCoreSchemaHandler, GetJsonSchemaHandler
from ._schema_generation_shared import GetCoreSchemaHandler

FieldInfo = import_cached_field_info()

Expand All @@ -42,110 +38,6 @@
StdSchemaFunction = Callable[[GenerateSchema, type[Any]], core_schema.CoreSchema]


@dataclasses.dataclass(**slots_true)
class InnerSchemaValidator:
"""Use a fixed CoreSchema, avoiding interference from outward annotations."""

core_schema: CoreSchema
js_schema: JsonSchemaValue | None = None
js_core_schema: CoreSchema | None = None
js_schema_update: JsonSchemaValue | None = None

def __get_pydantic_json_schema__(self, _schema: CoreSchema, handler: GetJsonSchemaHandler) -> JsonSchemaValue:
if self.js_schema is not None:
return self.js_schema
js_schema = handler(self.js_core_schema or self.core_schema)
if self.js_schema_update is not None:
js_schema.update(self.js_schema_update)
return js_schema

def __get_pydantic_core_schema__(self, _source_type: Any, _handler: GetCoreSchemaHandler) -> CoreSchema:
return self.core_schema


def path_schema_prepare_pydantic_annotations(
source_type: Any, annotations: Iterable[Any]
) -> tuple[Any, list[Any]] | None:
import pathlib

orig_source_type: Any = get_origin(source_type) or source_type
if (
(source_type_args := get_args(source_type))
and orig_source_type is os.PathLike
and source_type_args[0] not in {str, bytes, Any}
):
return None

if orig_source_type not in {
os.PathLike,
pathlib.Path,
pathlib.PurePath,
pathlib.PosixPath,
pathlib.PurePosixPath,
pathlib.PureWindowsPath,
}:
return None

metadata, remaining_annotations = _known_annotated_metadata.collect_known_metadata(annotations)
_known_annotated_metadata.check_metadata(metadata, _known_annotated_metadata.STR_CONSTRAINTS, orig_source_type)

is_first_arg_byte = source_type_args and source_type_args[0] is bytes
construct_path = pathlib.PurePath if orig_source_type is os.PathLike else orig_source_type
constrained_schema = (
core_schema.bytes_schema(**metadata) if is_first_arg_byte else core_schema.str_schema(**metadata)
)

def path_validator(input_value: str | bytes) -> os.PathLike[Any]: # type: ignore
try:
if is_first_arg_byte:
if isinstance(input_value, bytes):
try:
input_value = input_value.decode()
except UnicodeDecodeError as e:
raise PydanticCustomError('bytes_type', 'Input must be valid bytes') from e
else:
raise PydanticCustomError('bytes_type', 'Input must be bytes')
elif not isinstance(input_value, str):
raise PydanticCustomError('path_type', 'Input is not a valid path')

return construct_path(input_value)
except TypeError as e:
raise PydanticCustomError('path_type', 'Input is not a valid path') from e

instance_schema = core_schema.json_or_python_schema(
json_schema=core_schema.no_info_after_validator_function(path_validator, constrained_schema),
python_schema=core_schema.is_instance_schema(orig_source_type),
)

strict: bool | None = None
for annotation in annotations:
if isinstance(annotation, Strict):
strict = annotation.strict

schema = core_schema.lax_or_strict_schema(
lax_schema=core_schema.union_schema(
[
instance_schema,
core_schema.no_info_after_validator_function(path_validator, constrained_schema),
],
custom_error_type='path_type',
custom_error_message=f'Input is not a valid path for {orig_source_type}',
strict=True,
),
strict_schema=instance_schema,
serialization=core_schema.to_string_ser_schema(),
strict=strict,
)

return (
orig_source_type,
[
InnerSchemaValidator(schema, js_core_schema=constrained_schema, js_schema_update={'format': 'path'}),
*remaining_annotations,
],
)


def deque_validator(
input_value: Any, handler: core_schema.ValidatorFunctionWrapHandler, maxlen: None | int
) -> collections.deque[Any]:
Expand Down
25 changes: 25 additions & 0 deletions pydantic/_internal/_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from __future__ import annotations as _annotations

import math
import pathlib
import re
import typing
from decimal import Decimal
Expand Down Expand Up @@ -308,6 +309,11 @@ def multiple_of_validator(x: Any, multiple_of: Any) -> Any:
raise TypeError(f"Unable to apply constraint 'multiple_of' to supplied value {x}")


# TODO: we have a sort of hacky way of supporting min_length and max_length for path like types as a consequence
# of prior design that supported out of order metadata injection during core schema generation, which we have since abandoned
# see: https://github.com/pydantic/pydantic/pull/10846/


def min_length_validator(x: Any, min_length: Any) -> Any:
try:
if not (len(x) >= min_length):
Expand All @@ -316,6 +322,15 @@ def min_length_validator(x: Any, min_length: Any) -> Any:
)
return x
except TypeError:
if isinstance(x, pathlib.PurePath):
try:
if not (len(str(x)) >= min_length):
raise PydanticKnownError(
'too_short', {'field_type': 'Value', 'min_length': min_length, 'actual_length': len(str(x))}
)
return x
except TypeError:
pass
raise TypeError(f"Unable to apply constraint 'min_length' to supplied value {x}")


Expand All @@ -328,6 +343,16 @@ def max_length_validator(x: Any, max_length: Any) -> Any:
)
return x
except TypeError:
if isinstance(x, pathlib.PurePath):
try:
if len(str(x)) > max_length:
raise PydanticKnownError(
'too_long',
{'field_type': 'Value', 'max_length': max_length, 'actual_length': len(str(x))},
)
return x
except TypeError:
pass
raise TypeError(f"Unable to apply constraint 'max_length' to supplied value {x}")


Expand Down
Loading
0