-
-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Add pipeline API #9459
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add pipeline API #9459
Changes from 1 commit
e7eb43c
7d78407
6a1622d
947e920
b3fe1c4
a6d56cc
42505df
b2855d0
aafb856
0bd8b39
697833f
d767731
8abb6e4
217b11d
89c46a1
6e91a32
8e4d535
ada5853
8742e9e
f55b6e1
7132bae
0444fc9
1a8e505
d979841
fadf3bb
1699f35
d0a9372
bed0752
eb61549
a18a4df
34663fe
dff9ad9
479ab3c
7b49219
51bcad6
13b1721
b8573b5
888c4ed
141c8b6
9d4194b
128d4ea
1c7302d
88dcb75
19a3ee6
0652472
4ccf4e5
bad0a1a
a9d1099
14e9944
42a2708
021604f
38a2730
0c36b7c
8d46b21
a46c2e3
7386d69
dc07b50
cbb216b
581cbe8
c3a008f
26c5325
166df3d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -47,14 +47,14 @@ class _Transform: | |
|
|
||
| @dataclass(**slots_true) | ||
| class _PipelineOr: | ||
| left: Pipeline[Any, Any] | ||
| right: Pipeline[Any, Any] | ||
| left: _Pipeline[Any, Any] | ||
| right: _Pipeline[Any, Any] | ||
|
|
||
|
|
||
| @dataclass(**slots_true) | ||
| class _PipelineAnd: | ||
| left: Pipeline[Any, Any] | ||
| right: Pipeline[Any, Any] | ||
| left: _Pipeline[Any, Any] | ||
| right: _Pipeline[Any, Any] | ||
|
|
||
|
|
||
| _ConstraintAnnotation = Union[ | ||
|
|
@@ -88,93 +88,93 @@ class _FieldTypeMarker: | |
|
|
||
|
|
||
| @dataclass(**slots_true) | ||
| class Pipeline(Generic[_InT, _OutT]): | ||
| class _Pipeline(Generic[_InT, _OutT]): | ||
| """Abstract representation of a chain of validation, transformation, and parsing steps.""" | ||
|
|
||
| _steps: list[_Step] | ||
sydney-runkle marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| def transform( | ||
| self, | ||
| func: Callable[[_OutT], _NewOutT], | ||
| ) -> Pipeline[_InT, _NewOutT]: | ||
| ) -> _Pipeline[_InT, _NewOutT]: | ||
| """Transform the output of the previous step. | ||
|
|
||
| If used as the first step in a pipeline, the type of the field is used. | ||
| That is, the transformation is applied to after the value is parsed to the field's type. | ||
| """ | ||
| return Pipeline[_InT, _NewOutT](self._steps + [_Transform(func)]) | ||
| return _Pipeline[_InT, _NewOutT](self._steps + [_Transform(func)]) | ||
|
|
||
| @overload | ||
| def validate_as(self, tp: type[_NewOutT], *, strict: bool = ...) -> Pipeline[_InT, _NewOutT]: | ||
| def validate_as(self, tp: type[_NewOutT], *, strict: bool = ...) -> _Pipeline[_InT, _NewOutT]: | ||
| ... | ||
|
|
||
| @overload | ||
| def validate_as(self, *, strict: bool = ...) -> Pipeline[_InT, Any]: | ||
| def validate_as(self, *, strict: bool = ...) -> _Pipeline[_InT, Any]: | ||
| ... | ||
|
|
||
| def validate_as(self, tp: Any = _FieldTypeMarker, *, strict: bool = False) -> Pipeline[_InT, Any]: | ||
| def validate_as(self, tp: Any = _FieldTypeMarker, *, strict: bool = False) -> _Pipeline[_InT, Any]: | ||
| """Validate / parse the input into a new type. | ||
|
|
||
| If not type is provided, the type of the field is used. | ||
|
|
||
| Types are parsed in Pydantic's `lax` mode by default, | ||
| but you can enable `strict` mode by passing `strict=True`. | ||
| """ | ||
| return Pipeline[_InT, Any](self._steps + [_ValidateAs(tp, strict=strict)]) | ||
| return _Pipeline[_InT, Any](self._steps + [_ValidateAs(tp, strict=strict)]) | ||
|
|
||
| def validate_as_deferred(self, func: Callable[[], type[_NewOutT]]) -> Pipeline[_InT, _NewOutT]: | ||
| def validate_as_deferred(self, func: Callable[[], type[_NewOutT]]) -> _Pipeline[_InT, _NewOutT]: | ||
| """Parse the input into a new type, deferring resolution of the type until the current class | ||
| is fully defined. | ||
|
|
||
| This is useful when you need to reference the class in it's own type annotations. | ||
| """ | ||
| return Pipeline[_InT, _NewOutT](self._steps + [_ValidateAsDefer(func)]) | ||
| return _Pipeline[_InT, _NewOutT](self._steps + [_ValidateAsDefer(func)]) | ||
|
|
||
| # constraints | ||
| @overload | ||
| def constrain(self: Pipeline[_InT, _NewOutGe], constraint: annotated_types.Ge) -> Pipeline[_InT, _NewOutGe]: | ||
| def constrain(self: _Pipeline[_InT, _NewOutGe], constraint: annotated_types.Ge) -> _Pipeline[_InT, _NewOutGe]: | ||
| ... | ||
|
|
||
| @overload | ||
| def constrain(self: Pipeline[_InT, _NewOutGt], constraint: annotated_types.Gt) -> Pipeline[_InT, _NewOutGt]: | ||
| def constrain(self: _Pipeline[_InT, _NewOutGt], constraint: annotated_types.Gt) -> _Pipeline[_InT, _NewOutGt]: | ||
| ... | ||
|
|
||
| @overload | ||
| def constrain(self: Pipeline[_InT, _NewOutLe], constraint: annotated_types.Le) -> Pipeline[_InT, _NewOutLe]: | ||
| def constrain(self: _Pipeline[_InT, _NewOutLe], constraint: annotated_types.Le) -> _Pipeline[_InT, _NewOutLe]: | ||
| ... | ||
|
|
||
| @overload | ||
| def constrain(self: Pipeline[_InT, _NewOutLt], constraint: annotated_types.Lt) -> Pipeline[_InT, _NewOutLt]: | ||
| def constrain(self: _Pipeline[_InT, _NewOutLt], constraint: annotated_types.Lt) -> _Pipeline[_InT, _NewOutLt]: | ||
| ... | ||
|
|
||
| @overload | ||
| def constrain(self: Pipeline[_InT, _NewOutLen], constraint: annotated_types.Len) -> Pipeline[_InT, _NewOutLen]: | ||
| def constrain(self: _Pipeline[_InT, _NewOutLen], constraint: annotated_types.Len) -> _Pipeline[_InT, _NewOutLen]: | ||
| ... | ||
|
|
||
| @overload | ||
| def constrain( | ||
| self: Pipeline[_InT, _NewOutDiv], constraint: annotated_types.MultipleOf | ||
| ) -> Pipeline[_InT, _NewOutDiv]: | ||
| self: _Pipeline[_InT, _NewOutDiv], constraint: annotated_types.MultipleOf | ||
| ) -> _Pipeline[_InT, _NewOutDiv]: | ||
| ... | ||
|
|
||
| @overload | ||
| def constrain( | ||
| self: Pipeline[_InT, _NewOutDatetime], constraint: annotated_types.Timezone | ||
| ) -> Pipeline[_InT, _NewOutDatetime]: | ||
| self: _Pipeline[_InT, _NewOutDatetime], constraint: annotated_types.Timezone | ||
| ) -> _Pipeline[_InT, _NewOutDatetime]: | ||
| ... | ||
|
|
||
| @overload | ||
| def constrain(self: Pipeline[_InT, _OutT], constraint: annotated_types.Predicate) -> Pipeline[_InT, _OutT]: | ||
| def constrain(self: _Pipeline[_InT, _OutT], constraint: annotated_types.Predicate) -> _Pipeline[_InT, _OutT]: | ||
| ... | ||
|
|
||
| @overload | ||
| def constrain( | ||
| self: Pipeline[_InT, _NewOutInterval], constraint: annotated_types.Interval | ||
| ) -> Pipeline[_InT, _NewOutInterval]: | ||
| self: _Pipeline[_InT, _NewOutInterval], constraint: annotated_types.Interval | ||
| ) -> _Pipeline[_InT, _NewOutInterval]: | ||
| ... | ||
|
|
||
| @overload | ||
| def constrain(self: Pipeline[_InT, _NewOutT], constraint: Pattern[str]) -> Pipeline[_InT, _NewOutT]: | ||
| def constrain(self: _Pipeline[_InT, _NewOutT], constraint: Pattern[str]) -> _Pipeline[_InT, _NewOutT]: | ||
| ... | ||
|
|
||
| def constrain(self, constraint: _ConstraintAnnotation) -> Any: | ||
|
|
@@ -185,72 +185,72 @@ def constrain(self, constraint: _ConstraintAnnotation) -> Any: | |
| Most of the time you'll be calling a shortcut method like `gt`, `lt`, `len`, etc | ||
| so you don't need to call this directly. | ||
| """ | ||
| return Pipeline[_InT, _OutT](self._steps + [_Constraint(constraint)]) | ||
| return _Pipeline[_InT, _OutT](self._steps + [_Constraint(constraint)]) | ||
|
|
||
| def gt(self: Pipeline[_InT, _NewOutGt], gt: _NewOutGt) -> Pipeline[_InT, _NewOutGt]: | ||
| def gt(self: _Pipeline[_InT, _NewOutGt], gt: _NewOutGt) -> _Pipeline[_InT, _NewOutGt]: | ||
| """Constrain a value to be greater than a certain value.""" | ||
| return self.constrain(annotated_types.Gt(gt)) | ||
|
|
||
| def lt(self: Pipeline[_InT, _NewOutLt], lt: _NewOutLt) -> Pipeline[_InT, _NewOutLt]: | ||
| def lt(self: _Pipeline[_InT, _NewOutLt], lt: _NewOutLt) -> _Pipeline[_InT, _NewOutLt]: | ||
| """Constrain a value to be less than a certain value.""" | ||
| return self.constrain(annotated_types.Lt(lt)) | ||
|
|
||
| def ge(self: Pipeline[_InT, _NewOutGe], ge: _NewOutGe) -> Pipeline[_InT, _NewOutGe]: | ||
| def ge(self: _Pipeline[_InT, _NewOutGe], ge: _NewOutGe) -> _Pipeline[_InT, _NewOutGe]: | ||
| """Constrain a value to be greater than or equal to a certain value.""" | ||
| return self.constrain(annotated_types.Ge(ge)) | ||
|
|
||
| def le(self: Pipeline[_InT, _NewOutLe], le: _NewOutLe) -> Pipeline[_InT, _NewOutLe]: | ||
| def le(self: _Pipeline[_InT, _NewOutLe], le: _NewOutLe) -> _Pipeline[_InT, _NewOutLe]: | ||
| """Constrain a value to be less than or equal to a certain value.""" | ||
| return self.constrain(annotated_types.Le(le)) | ||
|
|
||
| def len(self: Pipeline[_InT, _NewOutLen], min_len: int, max_len: int | None = None) -> Pipeline[_InT, _NewOutLen]: | ||
| def len(self: _Pipeline[_InT, _NewOutLen], min_len: int, max_len: int | None = None) -> _Pipeline[_InT, _NewOutLen]: | ||
| """Constrain a value to have a certain length.""" | ||
| return self.constrain(annotated_types.Len(min_len, max_len)) | ||
|
|
||
| def multiple_of(self: Pipeline[_InT, _NewOutDiv], multiple_of: _NewOutDiv) -> Pipeline[_InT, _NewOutDiv]: | ||
| def multiple_of(self: _Pipeline[_InT, _NewOutDiv], multiple_of: _NewOutDiv) -> _Pipeline[_InT, _NewOutDiv]: | ||
| """Constrain a value to be a multiple of a certain number.""" | ||
| return self.constrain(annotated_types.MultipleOf(multiple_of)) | ||
|
|
||
| def predicate(self: Pipeline[_InT, _NewOutT], func: Callable[[_NewOutT], bool]) -> Pipeline[_InT, _NewOutT]: | ||
| def predicate(self: _Pipeline[_InT, _NewOutT], func: Callable[[_NewOutT], bool]) -> _Pipeline[_InT, _NewOutT]: | ||
sydney-runkle marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """Constrain a value to meet a certain predicate.""" | ||
| return self.constrain(annotated_types.Predicate(func)) | ||
|
|
||
| def not_in(self: Pipeline[_InT, _OutT], values: Container[_OutT]) -> Pipeline[_InT, _OutT]: | ||
| def not_in(self: _Pipeline[_InT, _OutT], values: Container[_OutT]) -> _Pipeline[_InT, _OutT]: | ||
| """Constrain a value to not be in a certain set.""" | ||
| return self.predicate(partial(operator.__contains__, values)) | ||
|
|
||
| def in_(self: Pipeline[_InT, _OutT], values: Container[_OutT]) -> Pipeline[_InT, _OutT]: | ||
| def in_(self: _Pipeline[_InT, _OutT], values: Container[_OutT]) -> _Pipeline[_InT, _OutT]: | ||
| """Constrain a value to be in a certain set.""" | ||
| return self.predicate(partial(operator.__contains__, values)) | ||
|
|
||
| def not_eq(self: Pipeline[_InT, _OutT], value: _OutT) -> Pipeline[_InT, _OutT]: | ||
| def not_eq(self: _Pipeline[_InT, _OutT], value: _OutT) -> _Pipeline[_InT, _OutT]: | ||
| """Constrain a value to not be equal to a certain value.""" | ||
| return self.predicate(partial(operator.__ne__, value)) | ||
|
|
||
| def eq(self: Pipeline[_InT, _OutT], value: _OutT) -> Pipeline[_InT, _OutT]: | ||
| def eq(self: _Pipeline[_InT, _OutT], value: _OutT) -> _Pipeline[_InT, _OutT]: | ||
| """Constrain a value to be equal to a certain value.""" | ||
| return self.predicate(partial(operator.__eq__, value)) | ||
|
|
||
| # timezone methods | ||
| @property | ||
| def dt(self: Pipeline[_InT, _NewOutDatetime]) -> _DateTimeValidator: | ||
| def dt(self: _Pipeline[_InT, _NewOutDatetime]) -> _DateTimeValidator: | ||
| return _DateTimeValidator(self._steps) | ||
|
|
||
| # string methods | ||
| @property | ||
| def str(self: Pipeline[_InT, _NewOutStr]) -> _StringValidator: | ||
| def str(self: _Pipeline[_InT, _NewOutStr]) -> _StringValidator: | ||
| return _StringValidator(self._steps) | ||
|
|
||
| # operators | ||
| def otherwise(self, other: Pipeline[_OtherIn, _OtherOut]) -> Pipeline[_InT | _OtherIn, _OutT | _OtherOut]: | ||
| def otherwise(self, other: _Pipeline[_OtherIn, _OtherOut]) -> _Pipeline[_InT | _OtherIn, _OutT | _OtherOut]: | ||
| """Combine two validation chains, returning the result of the first chain if it succeeds, and the second chain if it fails.""" | ||
| return Pipeline([_PipelineOr(self, other)]) | ||
| return _Pipeline([_PipelineOr(self, other)]) | ||
|
|
||
| __or__ = otherwise | ||
|
|
||
| def then(self, other: Pipeline[_OtherIn, _OtherOut]) -> Pipeline[_InT | _OtherIn, _OutT | _OtherOut]: | ||
| def then(self, other: _Pipeline[_OtherIn, _OtherOut]) -> _Pipeline[_InT | _OtherIn, _OutT | _OtherOut]: | ||
| """Pipe the result of one validation chain into another.""" | ||
| return Pipeline([_PipelineAnd(self, other)]) | ||
| return _Pipeline([_PipelineAnd(self, other)]) | ||
|
|
||
| __and__ = then | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I understand why this API was added — as long as the input and output types of a pipeline are the same, then it's basically equivalent to do them in order. However, I'll note that sequencing the items like this may end up being unintuitive, in particular if you expect to get an error for each failure in the case of multiple independent validators, rather than just the first failure. I understand it's hard to "fix" that given that transformations are possible though.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I take it that you mean that for I also think you’re saying that for the case where you have a chain of constraints you could error for all of them eg The one improvement we could make is “collapsing” sequential constraints into one level eg
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, my point was that I might expect
Member
Author
6D3F
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My expectation would be the exact opposite: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm late to the conversation, but I'd expect it to short-circuit the same way it would for an if expression or any other useage (that I know of) for
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @grantmwilliams I think it works as you'd expect then, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @adriangb I've only tested it a bit, but it seems to work exactly as expected. |
||
|
|
||
|
|
@@ -269,48 +269,48 @@ def __get_pydantic_core_schema__(self, source_type: Any, handler: GetCoreSchemaH | |
| return s | ||
|
|
||
|
|
||
| validate_as = Pipeline[Any, Any]([]).validate_as | ||
| parse_defer = Pipeline[Any, Any]([]).validate_as_deferred | ||
| transform = Pipeline[Any, Any]([_ValidateAs(_FieldTypeMarker)]).transform | ||
| validate_as = _Pipeline[Any, Any]([]).validate_as | ||
| parse_defer = _Pipeline[Any, Any]([]).validate_as_deferred | ||
| transform = _Pipeline[Any, Any]([_ValidateAs(_FieldTypeMarker)]).transform | ||
|
|
||
|
|
||
| class _StringValidator(Pipeline[str, str]): | ||
| def lower(self) -> Pipeline[str, str]: | ||
| class _StringValidator(_Pipeline[str, str]): | ||
| def lower(self) -> _Pipeline[str, str]: | ||
| return self.transform(str.lower) | ||
|
|
||
| def upper(self) -> Pipeline[str, str]: | ||
| def upper(self) -> _Pipeline[str, str]: | ||
| return self.transform(str.upper) | ||
|
|
||
| def title(self) -> Pipeline[str, str]: | ||
| def title(self) -> _Pipeline[str, str]: | ||
| return self.transform(str.title) | ||
|
|
||
| def strip(self) -> Pipeline[str, str]: | ||
| def strip(self) -> _Pipeline[str, str]: | ||
| return self.transform(str.strip) | ||
|
|
||
| def pattern(self, pattern: str) -> Pipeline[str, str]: | ||
| def pattern(self, pattern: str) -> _Pipeline[str, str]: | ||
| return self.constrain(re.compile(pattern)) | ||
|
|
||
| def contains(self, substring: str) -> Pipeline[str, str]: | ||
| def contains(self, substring: str) -> _Pipeline[str, str]: | ||
| return self.predicate(lambda v: substring in v) | ||
|
|
||
| def starts_with(self, prefix: str) -> Pipeline[str, str]: | ||
| def starts_with(self, prefix: str) -> _Pipeline[str, str]: | ||
| return self.predicate(lambda v: v.startswith(prefix)) | ||
|
|
||
| def ends_with(self, suffix: str) -> Pipeline[str, str]: | ||
| def ends_with(self, suffix: str) -> _Pipeline[str, str]: | ||
| return self.predicate(lambda v: v.endswith(suffix)) | ||
|
|
||
|
|
||
| class _DateTimeValidator(Pipeline[datetime.datetime, datetime.datetime]): | ||
| def tz_naive(self) -> Pipeline[datetime.datetime, datetime.datetime]: | ||
| class _DateTimeValidator(_Pipeline[datetime.datetime, datetime.datetime]): | ||
| def tz_naive(self) -> _Pipeline[datetime.datetime, datetime.datetime]: | ||
| return self.constrain(annotated_types.Timezone(None)) | ||
|
|
||
| def tz_aware(self) -> Pipeline[datetime.datetime, datetime.datetime]: | ||
| def tz_aware(self) -> _Pipeline[datetime.datetime, datetime.datetime]: | ||
| return self.constrain(annotated_types.Timezone(...)) | ||
|
|
||
| def tz(self, tz: datetime.tzinfo) -> Pipeline[datetime.datetime, datetime.datetime]: | ||
| def tz(self, tz: datetime.tzinfo) -> _Pipeline[datetime.datetime, datetime.datetime]: | ||
| return self.constrain(annotated_types.Timezone(tz)) # type: ignore | ||
|
|
||
| def with_tz(self, tz: datetime.tzinfo | None) -> Pipeline[datetime.datetime, datetime.datetime]: | ||
| def with_tz(self, tz: datetime.tzinfo | None) -> _Pipeline[datetime.datetime, datetime.datetime]: | ||
| return self.transform(partial(datetime.datetime.replace, tzinfo=tz)) | ||
|
|
||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.