diff --git a/bigframes/core/array_value.py b/bigframes/core/array_value.py index c7eaafe3de..445ff728b0 100644 --- a/bigframes/core/array_value.py +++ b/bigframes/core/array_value.py @@ -405,7 +405,7 @@ def project_window_op( skip_reproject_unsafe: skips the reprojection step, can be used when performing many non-dependent window operations, user responsible for not nesting window expressions, or using outputs as join, filter or aggregation keys before a reprojection """ # TODO: Support non-deterministic windowing - if window_spec.row_bounded or not op.order_independent: + if window_spec.is_row_bounded or not op.order_independent: if self.node.order_ambiguous and not self.session._strictly_ordered: if not self.session._allows_ambiguity: raise ValueError( diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index aba2228101..6202a34ce2 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -29,6 +29,7 @@ from google.cloud import bigquery import pyarrow as pa +from bigframes.core import utils import bigframes.core.compile.aggregate_compiler as agg_compiler import bigframes.core.compile.googlesql import bigframes.core.compile.ibis_types @@ -231,7 +232,7 @@ def aggregate( col_out: agg_compiler.compile_aggregate( aggregate, bindings, - order_by=_convert_ordering_to_table_values(table, order_by), + order_by=_convert_row_ordering_to_table_values(table, order_by), ) for aggregate, col_out in aggregations } @@ -439,7 +440,7 @@ def project_window_op( never_skip_nulls=never_skip_nulls, ) - if expression.op.order_independent and not window_spec.row_bounded: + if expression.op.order_independent and window_spec.is_unbounded: # notably percentile_cont does not support ordering clause window_spec = window_spec.without_order() window = self._ibis_window_from_spec(window_spec) @@ -517,16 +518,30 @@ def _ibis_window_from_spec(self, window_spec: WindowSpec): # 1. Order-independent op (aggregation, cut, rank) with unbound window - no ordering clause needed # 2. Order-independent op (aggregation, cut, rank) with range window - use ordering clause, ties allowed # 3. Order-depedenpent op (navigation functions, array_agg) or rows bounds - use total row order to break ties. - if window_spec.ordering: - order_by = _convert_ordering_to_table_values( + if window_spec.is_row_bounded: + if not window_spec.ordering: + # If window spec has following or preceding bounds, we need to apply an unambiguous ordering. + raise ValueError("No ordering provided for ordered analytic function") + order_by = _convert_row_ordering_to_table_values( self._column_names, window_spec.ordering, ) - elif window_spec.row_bounded: - # If window spec has following or preceding bounds, we need to apply an unambiguous ordering. - raise ValueError("No ordering provided for ordered analytic function") - else: + + elif window_spec.is_range_bounded: + order_by = [ + _convert_range_ordering_to_table_value( + self._column_names, + window_spec.ordering[0], + ) + ] + # The rest if branches are for unbounded windows + elif window_spec.ordering: # Unbound grouping window. Suitable for aggregations but not for analytic function application. + order_by = _convert_row_ordering_to_table_values( + self._column_names, + window_spec.ordering, + ) + else: order_by = None window = bigframes_vendored.ibis.window(order_by=order_by, group_by=group_by) @@ -551,7 +566,7 @@ def is_window(column: ibis_types.Value) -> bool: return any(isinstance(op, ibis_ops.WindowFunction) for op in matches) -def _convert_ordering_to_table_values( +def _convert_row_ordering_to_table_values( value_lookup: typing.Mapping[str, ibis_types.Value], ordering_columns: typing.Sequence[OrderingExpression], ) -> typing.Sequence[ibis_types.Value]: @@ -579,6 +594,30 @@ def _convert_ordering_to_table_values( return ordering_values +def _convert_range_ordering_to_table_value( + value_lookup: typing.Mapping[str, ibis_types.Value], + ordering_column: OrderingExpression, +) -> ibis_types.Value: + """Converts the ordering for range windows to Ibis references. + + Note that this method is different from `_convert_row_ordering_to_table_values` in + that it does not arrange null values. There are two reasons: + 1. Manipulating null positions requires more than one ordering key, which is forbidden + by SQL window syntax for range rolling. + 2. Pandas does not allow range rolling on timeseries with nulls. + + Therefore, we opt for the simplest approach here: generate the simplest SQL and follow + the BigQuery engine behavior. + """ + expr = op_compiler.compile_expression( + ordering_column.scalar_expression, value_lookup + ) + + if ordering_column.direction.is_ascending: + return bigframes_vendored.ibis.asc(expr) # type: ignore + return bigframes_vendored.ibis.desc(expr) # type: ignore + + def _string_cast_join_cond( lvalue: ibis_types.Column, rvalue: ibis_types.Column ) -> ibis_types.BooleanColumn: @@ -668,8 +707,14 @@ def _add_boundary( ) -> ibis_expr_builders.LegacyWindowBuilder: if isinstance(bounds, RangeWindowBounds): return ibis_window.range( - start=_to_ibis_boundary(bounds.start), - end=_to_ibis_boundary(bounds.end), + start=_to_ibis_boundary( + None + if bounds.start is None + else utils.timedelta_to_micros(bounds.start) + ), + end=_to_ibis_boundary( + None if bounds.end is None else utils.timedelta_to_micros(bounds.end) + ), ) if isinstance(bounds, RowsWindowBounds): if bounds.start is not None or bounds.end is not None: diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 4228a5fbb6..04d3ea1bf9 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -82,6 +82,7 @@ def _replace_unsupported_ops(node: nodes.BigFrameNode): # TODO: Run all replacement rules as single bottom-up pass node = nodes.bottom_up(node, rewrites.rewrite_slice) node = nodes.bottom_up(node, rewrites.rewrite_timedelta_expressions) + node = nodes.bottom_up(node, rewrites.rewrite_range_rolling) return node diff --git a/bigframes/core/compile/polars/compiler.py b/bigframes/core/compile/polars/compiler.py index 0c8c308042..baa19eb990 100644 --- a/bigframes/core/compile/polars/compiler.py +++ b/bigframes/core/compile/polars/compiler.py @@ -16,7 +16,7 @@ import dataclasses import functools import itertools -from typing import cast, Optional, Sequence, Tuple, TYPE_CHECKING, Union +from typing import cast, Optional, Sequence, Tuple, TYPE_CHECKING import bigframes.core from bigframes.core import window_spec @@ -359,6 +359,7 @@ def compile_window(self, node: nodes.WindowOpNode): return df.with_columns([agg_expr]) else: # row-bounded window + assert isinstance(window.bounds, window_spec.RowsWindowBounds) # Polars API semi-bounded, and any grouped rolling window challenging # https://github.com/pola-rs/polars/issues/4799 # https://github.com/pola-rs/polars/issues/8976 @@ -382,9 +383,7 @@ def compile_window(self, node: nodes.WindowOpNode): return pl.concat([df, results], how="horizontal") -def _get_period( - bounds: Union[window_spec.RowsWindowBounds, window_spec.RangeWindowBounds] -) -> Optional[int]: +def _get_period(bounds: window_spec.RowsWindowBounds) -> Optional[int]: """Returns None if the boundary is infinite.""" if bounds.start is None or bounds.end is None: return None diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index c7a458c4f3..a3f3613e4e 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -1353,7 +1353,7 @@ def _validate(self): """Validate the local data in the node.""" # Since inner order and row bounds are coupled, rank ops can't be row bounded assert ( - not self.window_spec.row_bounded + not self.window_spec.is_row_bounded ) or self.expression.op.implicitly_inherits_order assert all(ref in self.child.ids for ref in self.expression.column_references) @@ -1415,7 +1415,9 @@ def inherits_order(self) -> bool: op_inherits_order = ( not self.expression.op.order_independent ) and self.expression.op.implicitly_inherits_order - return op_inherits_order or self.window_spec.row_bounded + # range-bounded windows do not inherit orders because their ordering are + # already defined before rewrite time. + return op_inherits_order or self.window_spec.is_row_bounded @property def additive_base(self) -> BigFrameNode: diff --git a/bigframes/core/rewrite/__init__.py b/bigframes/core/rewrite/__init__.py index e5f7578911..58730805e4 100644 --- a/bigframes/core/rewrite/__init__.py +++ b/bigframes/core/rewrite/__init__.py @@ -19,6 +19,7 @@ from bigframes.core.rewrite.pruning import column_pruning from bigframes.core.rewrite.slices import pullup_limit_from_slice, rewrite_slice from bigframes.core.rewrite.timedeltas import rewrite_timedelta_expressions +from bigframes.core.rewrite.windows import rewrite_range_rolling __all__ = [ "legacy_join_as_projection", @@ -29,4 +30,5 @@ "remap_variables", "pull_up_order", "column_pruning", + "rewrite_range_rolling", ] diff --git a/bigframes/core/rewrite/windows.py b/bigframes/core/rewrite/windows.py new file mode 100644 index 0000000000..9f55db23af --- /dev/null +++ b/bigframes/core/rewrite/windows.py @@ -0,0 +1,45 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import dataclasses + +from bigframes import operations as ops +from bigframes.core import nodes + + +def rewrite_range_rolling(node: nodes.BigFrameNode) -> nodes.BigFrameNode: + if not isinstance(node, nodes.WindowOpNode): + return node + + if not node.window_spec.is_range_bounded: + return node + + if len(node.window_spec.ordering) != 1: + raise ValueError( + "Range rolling should only be performed on exactly one column." + ) + + ordering_expr = node.window_spec.ordering[0] + + new_ordering = dataclasses.replace( + ordering_expr, + scalar_expression=ops.UnixMicros().as_expr(ordering_expr.scalar_expression), + ) + + return dataclasses.replace( + node, + window_spec=dataclasses.replace(node.window_spec, ordering=(new_ordering,)), + ) diff --git a/bigframes/core/window/ordering.py b/bigframes/core/window/ordering.py new file mode 100644 index 0000000000..6ab66cf4d8 --- /dev/null +++ b/bigframes/core/window/ordering.py @@ -0,0 +1,81 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from functools import singledispatch + +from bigframes.core import expression as ex +from bigframes.core import nodes, ordering + + +@singledispatch +def find_order_direction( + root: nodes.BigFrameNode, column_id: str +) -> ordering.OrderingDirection | None: + """Returns the order of the given column with tree traversal. If the column cannot be found, + or the ordering information is not available, return None. + """ + return None + + +@find_order_direction.register +def _(root: nodes.OrderByNode, column_id: str): + if len(root.by) == 0: + # This is a no-op + return find_order_direction(root.child, column_id) + + # Make sure the window key is the prefix of sorting keys. + order_expr = root.by[0] + scalar_expr = order_expr.scalar_expression + if isinstance(scalar_expr, ex.DerefOp) and scalar_expr.id.name == column_id: + return order_expr.direction + + return None + + +@find_order_direction.register +def _(root: nodes.ReversedNode, column_id: str): + direction = find_order_direction(root.child, column_id) + + if direction is None: + return None + return direction.reverse() + + +@find_order_direction.register +def _(root: nodes.SelectionNode, column_id: str): + for alias_ref in root.input_output_pairs: + if alias_ref.id.name == column_id: + return find_order_direction(root.child, alias_ref.ref.id.name) + + +@find_order_direction.register +def _(root: nodes.FilterNode, column_id: str): + return find_order_direction(root.child, column_id) + + +@find_order_direction.register +def _(root: nodes.InNode, column_id: str): + return find_order_direction(root.left_child, column_id) + + +@find_order_direction.register +def _(root: nodes.WindowOpNode, column_id: str): + return find_order_direction(root.child, column_id) + + +@find_order_direction.register +def _(root: nodes.ProjectionNode, column_id: str): + return find_order_direction(root.child, column_id) diff --git a/bigframes/core/window/rolling.py b/bigframes/core/window/rolling.py index b10b2da123..b4636bfc8d 100644 --- a/bigframes/core/window/rolling.py +++ b/bigframes/core/window/rolling.py @@ -14,12 +14,18 @@ from __future__ import annotations +import datetime import typing import bigframes_vendored.pandas.core.window.rolling as vendored_pandas_rolling +import numpy +import pandas -from bigframes.core import log_adapter, window_spec +from bigframes import dtypes +from bigframes.core import expression as ex +from bigframes.core import log_adapter, ordering, window_spec import bigframes.core.blocks as blocks +from bigframes.core.window import ordering as window_ordering import bigframes.operations.aggregations as agg_ops @@ -118,3 +124,38 @@ def _aggregate_block( labels = [self._block.col_id_to_label[col] for col in agg_col_ids] return block.select_columns(result_ids).with_column_labels(labels) + + +def create_range_window( + block: blocks.Block, + window: pandas.Timedelta | numpy.timedelta64 | datetime.timedelta | str, + min_periods: int | None, + closed: typing.Literal["right", "left", "both", "neither"], + is_series: bool, +) -> Window: + + index_dtypes = block.index.dtypes + if len(index_dtypes) > 1: + raise ValueError("Range rolling on MultiIndex is not supported") + if index_dtypes[0] != dtypes.TIMESTAMP_DTYPE: + raise ValueError("Index type should be timestamps with timezones") + + order_direction = window_ordering.find_order_direction( + block.expr.node, block.index_columns[0] + ) + if order_direction is None: + raise ValueError( + "The index might not be in a monotonic order. Please sort the index before rolling." + ) + if isinstance(window, str): + window = pandas.Timedelta(window) + spec = window_spec.WindowSpec( + bounds=window_spec.RangeWindowBounds.from_timedelta_window(window, closed), + min_periods=1 if min_periods is None else min_periods, + ordering=( + ordering.OrderingExpression( + ex.deref(block.index_columns[0]), order_direction + ), + ), + ) + return Window(block, spec, block.value_columns, is_series=is_series) diff --git a/bigframes/core/window_spec.py b/bigframes/core/window_spec.py index 10a5d9119c..d08ba3d12a 100644 --- a/bigframes/core/window_spec.py +++ b/bigframes/core/window_spec.py @@ -14,9 +14,13 @@ from __future__ import annotations from dataclasses import dataclass, replace +import datetime import itertools from typing import Literal, Mapping, Optional, Set, Tuple, Union +import numpy as np +import pandas as pd + import bigframes.core.expression as ex import bigframes.core.identifiers as ids import bigframes.core.ordering as orderings @@ -168,9 +172,31 @@ def __post_init__(self): @dataclass(frozen=True) class RangeWindowBounds: - # TODO(b/388916840) Support range rolling on timeseries with timedeltas. - start: Optional[int] = None - end: Optional[int] = None + """Represents a time range window, inclusively bounded by start and end""" + + start: pd.Timedelta | None = None + end: pd.Timedelta | None = None + + @classmethod + def from_timedelta_window( + cls, + window: pd.Timedelta | np.timedelta64 | datetime.timedelta, + closed: Literal["right", "left", "both", "neither"], + ) -> RangeWindowBounds: + window = pd.Timedelta(window) + tick = pd.Timedelta("1us") + zero = pd.Timedelta(0) + + if closed == "right": + return cls(-(window - tick), zero) + elif closed == "left": + return cls(-window, -tick) + elif closed == "both": + return cls(-window, zero) + elif closed == "neither": + return cls(-(window - tick), -tick) + else: + raise ValueError(f"Unsupported value for 'closed' parameter: {closed}") def __post_init__(self): if self.start is None: @@ -201,7 +227,7 @@ class WindowSpec: min_periods: int = 0 @property - def row_bounded(self): + def is_row_bounded(self): """ Whether the window is bounded by row offsets. @@ -210,6 +236,26 @@ def row_bounded(self): """ return isinstance(self.bounds, RowsWindowBounds) + @property + def is_range_bounded(self): + """ + Whether the window is bounded by range offsets. + + This is relevant for determining whether the window requires a total order + to calculate deterministically. + """ + return isinstance(self.bounds, RangeWindowBounds) + + @property + def is_unbounded(self): + """ + Whether the window is unbounded. + + This is relevant for determining whether the window requires a total order + to calculate deterministically. + """ + return self.bounds is None + @property def all_referenced_columns(self) -> Set[ids.ColumnId]: """ @@ -222,7 +268,7 @@ def all_referenced_columns(self) -> Set[ids.ColumnId]: def without_order(self) -> WindowSpec: """Removes ordering clause if ordering isn't required to define bounds.""" - if self.row_bounded: + if self.is_row_bounded: raise ValueError("Cannot remove order from row-bounded window") return replace(self, ordering=()) diff --git a/bigframes/series.py b/bigframes/series.py index 305bc93a09..882c601b7c 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -57,6 +57,7 @@ import bigframes.core.utils as utils import bigframes.core.validations as validations import bigframes.core.window +from bigframes.core.window import rolling import bigframes.core.window_spec as windows import bigframes.dataframe import bigframes.dtypes @@ -1549,16 +1550,22 @@ def sort_index(self, *, axis=0, ascending=True, na_position="last") -> Series: @validations.requires_ordering() def rolling( self, - window: int, - min_periods=None, + window: int | pandas.Timedelta | numpy.timedelta64 | datetime.timedelta | str, + min_periods: int | None = None, closed: Literal["right", "left", "both", "neither"] = "right", ) -> bigframes.core.window.Window: - window_spec = windows.WindowSpec( - bounds=windows.RowsWindowBounds.from_window_size(window, closed), - min_periods=min_periods if min_periods is not None else window, - ) - return bigframes.core.window.Window( - self._block, window_spec, self._block.value_columns, is_series=True + if isinstance(window, int): + # Rows rolling + window_spec = windows.WindowSpec( + bounds=windows.RowsWindowBounds.from_window_size(window, closed), + min_periods=window if min_periods is None else min_periods, + ) + return bigframes.core.window.Window( + self._block, window_spec, self._block.value_columns, is_series=True + ) + + return rolling.create_range_window( + self._block, window, min_periods, closed, is_series=True ) @validations.requires_ordering() diff --git a/tests/system/small/test_window.py b/tests/system/small/test_window.py index bfe97f5103..fe233c3a7a 100644 --- a/tests/system/small/test_window.py +++ b/tests/system/small/test_window.py @@ -12,12 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime + +import numpy as np import pandas as pd import pytest @pytest.fixture(scope="module") -def rolling_dfs(scalars_dfs): +def rows_rolling_dfs(scalars_dfs): bf_df, pd_df = scalars_dfs target_cols = ["int64_too", "float64_col", "int64_col"] @@ -26,7 +29,23 @@ def rolling_dfs(scalars_dfs): @pytest.fixture(scope="module") -def rolling_series(scalars_dfs): +def range_rolling_dfs(session): + pd_df = pd.DataFrame( + { + "ts_col": pd.Timestamp("20250101", tz="UTC") + + pd.to_timedelta(np.arange(10), "s"), + "dt_col": pd.Timestamp("20250101") + pd.to_timedelta(np.arange(10), "s"), + "int_col": np.arange(10), + } + ) + + bf_df = session.read_pandas(pd_df) + + return bf_df, pd_df + + +@pytest.fixture(scope="module") +def rows_rolling_series(scalars_dfs): bf_df, pd_df = scalars_dfs target_col = "int64_too" @@ -34,8 +53,8 @@ def rolling_series(scalars_dfs): @pytest.mark.parametrize("closed", ["left", "right", "both", "neither"]) -def test_dataframe_rolling_closed_param(rolling_dfs, closed): - bf_df, pd_df = rolling_dfs +def test_dataframe_rolling_closed_param(rows_rolling_dfs, closed): + bf_df, pd_df = rows_rolling_dfs actual_result = bf_df.rolling(window=3, closed=closed).sum().to_pandas() @@ -44,8 +63,8 @@ def test_dataframe_rolling_closed_param(rolling_dfs, closed): @pytest.mark.parametrize("closed", ["left", "right", "both", "neither"]) -def test_dataframe_groupby_rolling_closed_param(rolling_dfs, closed): - bf_df, pd_df = rolling_dfs +def test_dataframe_groupby_rolling_closed_param(rows_rolling_dfs, closed): + bf_df, pd_df = rows_rolling_dfs # Need to specify column subset for comparison due to b/406841327 check_columns = ["float64_col", "int64_col"] @@ -64,8 +83,8 @@ def test_dataframe_groupby_rolling_closed_param(rolling_dfs, closed): ) -def test_dataframe_rolling_on(rolling_dfs): - bf_df, pd_df = rolling_dfs +def test_dataframe_rolling_on(rows_rolling_dfs): + bf_df, pd_df = rows_rolling_dfs actual_result = bf_df.rolling(window=3, on="int64_too").sum().to_pandas() @@ -73,15 +92,15 @@ def test_dataframe_rolling_on(rolling_dfs): pd.testing.assert_frame_equal(actual_result, expected_result, check_dtype=False) -def test_dataframe_rolling_on_invalid_column_raise_error(rolling_dfs): - bf_df, _ = rolling_dfs +def test_dataframe_rolling_on_invalid_column_raise_error(rows_rolling_dfs): + bf_df, _ = rows_rolling_dfs with pytest.raises(ValueError): bf_df.rolling(window=3, on="whatever").sum() -def test_dataframe_groupby_rolling_on(rolling_dfs): - bf_df, pd_df = rolling_dfs +def test_dataframe_groupby_rolling_on(rows_rolling_dfs): + bf_df, pd_df = rows_rolling_dfs # Need to specify column subset for comparison due to b/406841327 check_columns = ["float64_col", "int64_col"] @@ -100,16 +119,16 @@ def test_dataframe_groupby_rolling_on(rolling_dfs): ) -def test_dataframe_groupby_rolling_on_invalid_column_raise_error(rolling_dfs): - bf_df, _ = rolling_dfs +def test_dataframe_groupby_rolling_on_invalid_column_raise_error(rows_rolling_dfs): + bf_df, _ = rows_rolling_dfs with pytest.raises(ValueError): bf_df.groupby(level=0).rolling(window=3, on="whatever").sum() @pytest.mark.parametrize("closed", ["left", "right", "both", "neither"]) -def test_series_rolling_closed_param(rolling_series, closed): - bf_series, df_series = rolling_series +def test_series_rolling_closed_param(rows_rolling_series, closed): + bf_series, df_series = rows_rolling_series actual_result = bf_series.rolling(window=3, closed=closed).sum().to_pandas() @@ -118,8 +137,8 @@ def test_series_rolling_closed_param(rolling_series, closed): @pytest.mark.parametrize("closed", ["left", "right", "both", "neither"]) -def test_series_groupby_rolling_closed_param(rolling_series, closed): - bf_series, df_series = rolling_series +def test_series_groupby_rolling_closed_param(rows_rolling_series, closed): + bf_series, df_series = rows_rolling_series actual_result = ( bf_series.groupby(bf_series % 2) @@ -159,8 +178,8 @@ def test_series_groupby_rolling_closed_param(rolling_series, closed): pytest.param(lambda x: x.var(), id="var"), ], ) -def test_series_window_agg_ops(rolling_series, windowing, agg_op): - bf_series, pd_series = rolling_series +def test_series_window_agg_ops(rows_rolling_series, windowing, agg_op): + bf_series, pd_series = rows_rolling_series actual_result = agg_op(windowing(bf_series)).to_pandas() @@ -205,3 +224,69 @@ def test_dataframe_window_agg_ops(scalars_dfs, windowing, agg_op): pd_result = agg_op(windowing(pd_df)) pd.testing.assert_frame_equal(pd_result, bf_result, check_dtype=False) + + +@pytest.mark.parametrize("closed", ["left", "right", "both", "neither"]) +@pytest.mark.parametrize( + "window", # skipped numpy timedelta because Pandas does not support it. + [pd.Timedelta("3s"), datetime.timedelta(seconds=3), "3s"], +) +@pytest.mark.parametrize("ascending", [True, False]) +def test_series_range_rolling(range_rolling_dfs, window, closed, ascending): + bf_df, pd_df = range_rolling_dfs + bf_series = bf_df.set_index("ts_col")["int_col"] + pd_series = pd_df.set_index("ts_col")["int_col"] + + actual_result = ( + bf_series.sort_index(ascending=ascending) + .rolling(window=window, closed=closed) + .min() + .to_pandas() + ) + + expected_result = ( + pd_series.sort_index(ascending=ascending) + .rolling(window=window, closed=closed) + .min() + ) + pd.testing.assert_series_equal( + actual_result, expected_result, check_dtype=False, check_index=False + ) + + +def test_range_rolling_order_info_lookup(range_rolling_dfs): + bf_df, pd_df = range_rolling_dfs + + actual_result = ( + bf_df.set_index("ts_col") + .sort_index(ascending=False)["int_col"] + .isin(bf_df["int_col"]) + .rolling(window="3s") + .count() + .to_pandas() + ) + + expected_result = ( + pd_df.set_index("ts_col") + .sort_index(ascending=False)["int_col"] + .isin(pd_df["int_col"]) + .rolling(window="3s") + .count() + ) + pd.testing.assert_series_equal( + actual_result, expected_result, check_dtype=False, check_index=False + ) + + +def test_range_rolling_unsupported_index_type_raise_error(range_rolling_dfs): + bf_df, _ = range_rolling_dfs + + with pytest.raises(ValueError): + bf_df["int_col"].sort_index().rolling(window="3s") + + +def test_range_rolling_unsorted_column_raise_error(range_rolling_dfs): + bf_df, _ = range_rolling_dfs + + with pytest.raises(ValueError): + bf_df["int_col"].rolling(window="3s")