diff --git a/bigframes/core/groupby/dataframe_group_by.py b/bigframes/core/groupby/dataframe_group_by.py index eb2a107c1d..f234bad126 100644 --- a/bigframes/core/groupby/dataframe_group_by.py +++ b/bigframes/core/groupby/dataframe_group_by.py @@ -14,11 +14,13 @@ from __future__ import annotations +import datetime import typing from typing import Literal, Sequence, Tuple, Union import bigframes_vendored.constants as constants import bigframes_vendored.pandas.core.groupby as vendored_pandas_groupby +import numpy import pandas as pd from bigframes import session @@ -30,6 +32,7 @@ import bigframes.core.ordering as order import bigframes.core.utils as utils import bigframes.core.validations as validations +from bigframes.core.window import rolling import bigframes.core.window as windows import bigframes.core.window_spec as window_specs import bigframes.dataframe as df @@ -309,28 +312,41 @@ def diff(self, periods=1) -> series.Series: @validations.requires_ordering() def rolling( self, - window: int, + window: int | pd.Timedelta | numpy.timedelta64 | datetime.timedelta | str, min_periods=None, on: str | None = None, closed: Literal["right", "left", "both", "neither"] = "right", ) -> windows.Window: - window_spec = window_specs.WindowSpec( - bounds=window_specs.RowsWindowBounds.from_window_size(window, closed), - min_periods=min_periods if min_periods is not None else window, - grouping_keys=tuple(ex.deref(col) for col in self._by_col_ids), - ) - block = self._block.order_by( - [order.ascending_over(col) for col in self._by_col_ids], - ) - skip_agg_col_id = ( - None if on is None else self._block.resolve_label_exact_or_error(on) - ) - return windows.Window( - block, - window_spec, - self._selected_cols, + if isinstance(window, int): + window_spec = window_specs.WindowSpec( + bounds=window_specs.RowsWindowBounds.from_window_size(window, closed), + min_periods=min_periods if min_periods is not None else window, + grouping_keys=tuple(ex.deref(col) for col in self._by_col_ids), + ) + block = self._block.order_by( + [order.ascending_over(col) for col in self._by_col_ids], + ) + skip_agg_col_id = ( + None if on is None else self._block.resolve_label_exact_or_error(on) + ) + return windows.Window( + block, + window_spec, + self._selected_cols, + drop_null_groups=self._dropna, + skip_agg_column_id=skip_agg_col_id, + ) + + return rolling.create_range_window( + self._block, + window, + min_periods=min_periods, + value_column_ids=self._selected_cols, + on=on, + closed=closed, + is_series=False, + grouping_keys=self._by_col_ids, drop_null_groups=self._dropna, - skip_agg_column_id=skip_agg_col_id, ) @validations.requires_ordering() diff --git a/bigframes/core/groupby/series_group_by.py b/bigframes/core/groupby/series_group_by.py index 6d3c1252de..a29bb45a32 100644 --- a/bigframes/core/groupby/series_group_by.py +++ b/bigframes/core/groupby/series_group_by.py @@ -14,11 +14,14 @@ from __future__ import annotations +import datetime import typing from typing import Literal, Sequence, Union import bigframes_vendored.constants as constants import bigframes_vendored.pandas.core.groupby as vendored_pandas_groupby +import numpy +import pandas from bigframes import session from bigframes.core import expression as ex @@ -29,6 +32,7 @@ import bigframes.core.ordering as order import bigframes.core.utils as utils import bigframes.core.validations as validations +from bigframes.core.window import rolling import bigframes.core.window as windows import bigframes.core.window_spec as window_specs import bigframes.dataframe as df @@ -246,24 +250,36 @@ def diff(self, periods=1) -> series.Series: @validations.requires_ordering() def rolling( self, - window: int, + window: int | pandas.Timedelta | numpy.timedelta64 | datetime.timedelta | str, min_periods=None, closed: Literal["right", "left", "both", "neither"] = "right", ) -> windows.Window: - window_spec = window_specs.WindowSpec( - bounds=window_specs.RowsWindowBounds.from_window_size(window, closed), - min_periods=min_periods if min_periods is not None else window, - grouping_keys=tuple(ex.deref(col) for col in self._by_col_ids), - ) - block = self._block.order_by( - [order.ascending_over(col) for col in self._by_col_ids], - ) - return windows.Window( - block, - window_spec, - [self._value_column], - drop_null_groups=self._dropna, + if isinstance(window, int): + window_spec = window_specs.WindowSpec( + bounds=window_specs.RowsWindowBounds.from_window_size(window, closed), + min_periods=min_periods if min_periods is not None else window, + grouping_keys=tuple(ex.deref(col) for col in self._by_col_ids), + ) + block = self._block.order_by( + [order.ascending_over(col) for col in self._by_col_ids], + ) + return windows.Window( + block, + window_spec, + [self._value_column], + drop_null_groups=self._dropna, + is_series=True, + ) + + return rolling.create_range_window( + self._block, + window, + min_periods=min_periods, + value_column_ids=[self._value_column], + closed=closed, is_series=True, + grouping_keys=self._by_col_ids, + drop_null_groups=self._dropna, ) @validations.requires_ordering() diff --git a/bigframes/core/window/ordering.py b/bigframes/core/window/ordering.py index 6ab66cf4d8..0bea585bb0 100644 --- a/bigframes/core/window/ordering.py +++ b/bigframes/core/window/ordering.py @@ -78,4 +78,9 @@ def _(root: nodes.WindowOpNode, column_id: str): @find_order_direction.register def _(root: nodes.ProjectionNode, column_id: str): + for expr, ref in root.assignments: + if ref.name == column_id and isinstance(expr, ex.DerefOp): + # This source column is renamed. + return find_order_direction(root.child, expr.id.name) + return find_order_direction(root.child, column_id) diff --git a/bigframes/core/window/rolling.py b/bigframes/core/window/rolling.py index b4636bfc8d..a9c6dfdfa7 100644 --- a/bigframes/core/window/rolling.py +++ b/bigframes/core/window/rolling.py @@ -76,18 +76,7 @@ def _apply_aggregate( self, op: agg_ops.UnaryAggregateOp, ): - agg_col_ids = [ - col_id - for col_id in self._value_column_ids - if col_id != self._skip_agg_column_id - ] - agg_block = self._aggregate_block(op, agg_col_ids) - - if self._skip_agg_column_id is not None: - # Concat the skipped column to the result. - agg_block, _ = agg_block.join( - self._block.select_column(self._skip_agg_column_id), how="outer" - ) + agg_block = self._aggregate_block(op) if self._is_series: from bigframes.series import Series @@ -102,9 +91,12 @@ def _apply_aggregate( ] return DataFrame(agg_block)._reindex_columns(column_labels) - def _aggregate_block( - self, op: agg_ops.UnaryAggregateOp, agg_col_ids: typing.List[str] - ) -> blocks.Block: + def _aggregate_block(self, op: agg_ops.UnaryAggregateOp) -> blocks.Block: + agg_col_ids = [ + col_id + for col_id in self._value_column_ids + if col_id != self._skip_agg_column_id + ] block, result_ids = self._block.multi_apply_window_op( agg_col_ids, op, @@ -123,29 +115,47 @@ def _aggregate_block( block = block.set_index(col_ids=index_ids) labels = [self._block.col_id_to_label[col] for col in agg_col_ids] + if self._skip_agg_column_id is not None: + result_ids = [self._skip_agg_column_id, *result_ids] + labels.insert(0, self._block.col_id_to_label[self._skip_agg_column_id]) + return block.select_columns(result_ids).with_column_labels(labels) def create_range_window( block: blocks.Block, window: pandas.Timedelta | numpy.timedelta64 | datetime.timedelta | str, + *, + value_column_ids: typing.Sequence[str] = tuple(), min_periods: int | None, + on: str | None = None, closed: typing.Literal["right", "left", "both", "neither"], is_series: bool, + grouping_keys: typing.Sequence[str] = tuple(), + drop_null_groups: bool = True, ) -> Window: - index_dtypes = block.index.dtypes - if len(index_dtypes) > 1: - raise ValueError("Range rolling on MultiIndex is not supported") - if index_dtypes[0] != dtypes.TIMESTAMP_DTYPE: - raise ValueError("Index type should be timestamps with timezones") + if on is None: + # Rolling on index + index_dtypes = block.index.dtypes + if len(index_dtypes) > 1: + raise ValueError("Range rolling on MultiIndex is not supported") + if index_dtypes[0] != dtypes.TIMESTAMP_DTYPE: + raise ValueError("Index type should be timestamps with timezones") + rolling_key_col_id = block.index_columns[0] + else: + # Rolling on a specific column + rolling_key_col_id = block.resolve_label_exact_or_error(on) + if block.expr.get_column_type(rolling_key_col_id) != dtypes.TIMESTAMP_DTYPE: + raise ValueError(f"Column {on} type should be timestamps with timezones") order_direction = window_ordering.find_order_direction( - block.expr.node, block.index_columns[0] + block.expr.node, rolling_key_col_id ) if order_direction is None: + target_str = "index" if on is None else f"column {on}" raise ValueError( - "The index might not be in a monotonic order. Please sort the index before rolling." + f"The {target_str} might not be in a monotonic order. Please sort by {target_str} before rolling." ) if isinstance(window, str): window = pandas.Timedelta(window) @@ -153,9 +163,23 @@ def create_range_window( bounds=window_spec.RangeWindowBounds.from_timedelta_window(window, closed), min_periods=1 if min_periods is None else min_periods, ordering=( - ordering.OrderingExpression( - ex.deref(block.index_columns[0]), order_direction - ), + ordering.OrderingExpression(ex.deref(rolling_key_col_id), order_direction), ), + grouping_keys=tuple(ex.deref(col) for col in grouping_keys), + ) + + selected_value_col_ids = ( + value_column_ids if value_column_ids else block.value_columns + ) + # This step must be done after finding the order direction of the window key. + if grouping_keys: + block = block.order_by([ordering.ascending_over(col) for col in grouping_keys]) + + return Window( + block, + spec, + value_column_ids=selected_value_col_ids, + is_series=is_series, + skip_agg_column_id=None if on is None else rolling_key_col_id, + drop_null_groups=drop_null_groups, ) - return Window(block, spec, block.value_columns, is_series=is_series) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 53490d7771..95ea487786 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -67,12 +67,12 @@ import bigframes.core.utils as utils import bigframes.core.validations as validations import bigframes.core.window +from bigframes.core.window import rolling import bigframes.core.window_spec as windows import bigframes.dtypes import bigframes.exceptions as bfe import bigframes.formatting_helpers as formatter import bigframes.operations as ops -import bigframes.operations.aggregations import bigframes.operations.aggregations as agg_ops import bigframes.operations.ai import bigframes.operations.plotting as plotting @@ -3393,23 +3393,33 @@ def _perform_join_by_index( @validations.requires_ordering() def rolling( self, - window: int, + window: int | pandas.Timedelta | numpy.timedelta64 | datetime.timedelta | str, min_periods=None, on: str | None = None, closed: Literal["right", "left", "both", "neither"] = "right", ) -> bigframes.core.window.Window: - window_def = windows.WindowSpec( - bounds=windows.RowsWindowBounds.from_window_size(window, closed), - min_periods=min_periods if min_periods is not None else window, - ) - skip_agg_col_id = ( - None if on is None else self._block.resolve_label_exact_or_error(on) - ) - return bigframes.core.window.Window( + if isinstance(window, int): + window_def = windows.WindowSpec( + bounds=windows.RowsWindowBounds.from_window_size(window, closed), + min_periods=min_periods if min_periods is not None else window, + ) + skip_agg_col_id = ( + None if on is None else self._block.resolve_label_exact_or_error(on) + ) + return bigframes.core.window.Window( + self._block, + window_def, + self._block.value_columns, + skip_agg_column_id=skip_agg_col_id, + ) + + return rolling.create_range_window( self._block, - window_def, - self._block.value_columns, - skip_agg_column_id=skip_agg_col_id, + window, + min_periods=min_periods, + on=on, + closed=closed, + is_series=False, ) @validations.requires_ordering() diff --git a/bigframes/series.py b/bigframes/series.py index 559f7ef48e..87f1f1d141 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1590,7 +1590,11 @@ def rolling( ) return rolling.create_range_window( - self._block, window, min_periods, closed, is_series=True + block=self._block, + window=window, + min_periods=min_periods, + closed=closed, + is_series=True, ) @validations.requires_ordering() diff --git a/tests/system/small/test_window.py b/tests/system/small/test_window.py index fe233c3a7a..b48bb8bc86 100644 --- a/tests/system/small/test_window.py +++ b/tests/system/small/test_window.py @@ -18,6 +18,8 @@ import pandas as pd import pytest +from bigframes import dtypes + @pytest.fixture(scope="module") def rows_rolling_dfs(scalars_dfs): @@ -30,12 +32,12 @@ def rows_rolling_dfs(scalars_dfs): @pytest.fixture(scope="module") def range_rolling_dfs(session): + values = np.arange(20) pd_df = pd.DataFrame( { - "ts_col": pd.Timestamp("20250101", tz="UTC") - + pd.to_timedelta(np.arange(10), "s"), - "dt_col": pd.Timestamp("20250101") + pd.to_timedelta(np.arange(10), "s"), - "int_col": np.arange(10), + "ts_col": pd.Timestamp("20250101", tz="UTC") + pd.to_timedelta(values, "s"), + "int_col": values % 4, + "float_col": values / 2, } ) @@ -254,6 +256,102 @@ def test_series_range_rolling(range_rolling_dfs, window, closed, ascending): ) +def test_series_groupby_range_rolling(range_rolling_dfs): + bf_df, pd_df = range_rolling_dfs + bf_series = bf_df.set_index("ts_col")["int_col"] + pd_series = pd_df.set_index("ts_col")["int_col"] + + actual_result = ( + bf_series.sort_index() + .groupby(bf_series % 2 == 0) + .rolling(window="3s") + .min() + .to_pandas() + ) + + expected_result = ( + pd_series.sort_index().groupby(pd_series % 2 == 0).rolling(window="3s").min() + ) + pd.testing.assert_series_equal( + actual_result, expected_result, check_dtype=False, check_index=False + ) + + +@pytest.mark.parametrize("closed", ["left", "right", "both", "neither"]) +@pytest.mark.parametrize( + "window", # skipped numpy timedelta because Pandas does not support it. + [pd.Timedelta("3s"), datetime.timedelta(seconds=3), "3s"], +) +@pytest.mark.parametrize("ascending", [True, False]) +def test_dataframe_range_rolling(range_rolling_dfs, window, closed, ascending): + bf_df, pd_df = range_rolling_dfs + bf_df = bf_df.set_index("ts_col") + pd_df = pd_df.set_index("ts_col") + + actual_result = ( + bf_df.sort_index(ascending=ascending) + .rolling(window=window, closed=closed) + .min() + .to_pandas() + ) + + expected_result = ( + pd_df.sort_index(ascending=ascending) + .rolling(window=window, closed=closed) + .min() + ) + # Need to cast Pandas index type. Otherwise it uses DatetimeIndex that + # does not exist in BigFrame + expected_result.index = expected_result.index.astype(dtypes.TIMESTAMP_DTYPE) + pd.testing.assert_frame_equal( + actual_result, + expected_result, + check_dtype=False, + ) + + +def test_dataframe_range_rolling_on(range_rolling_dfs): + bf_df, pd_df = range_rolling_dfs + on = "ts_col" + + actual_result = bf_df.sort_values(on).rolling(window="3s", on=on).min().to_pandas() + + expected_result = pd_df.sort_values(on).rolling(window="3s", on=on).min() + # Need to specify the column order because Pandas (seemingly) + # re-arranges columns alphabetically + cols = ["ts_col", "int_col", "float_col"] + pd.testing.assert_frame_equal( + actual_result[cols], + expected_result[cols], + check_dtype=False, + check_index_type=False, + ) + + +def test_dataframe_groupby_range_rolling(range_rolling_dfs): + bf_df, pd_df = range_rolling_dfs + on = "ts_col" + + actual_result = ( + bf_df.sort_values(on) + .groupby("int_col") + .rolling(window="3s", on=on) + .min() + .to_pandas() + ) + + expected_result = ( + pd_df.sort_values(on).groupby("int_col").rolling(window="3s", on=on).min() + ) + expected_result.index = expected_result.index.set_names("index", level=1) + pd.testing.assert_frame_equal( + actual_result, + expected_result, + check_dtype=False, + check_index_type=False, + ) + + def test_range_rolling_order_info_lookup(range_rolling_dfs): bf_df, pd_df = range_rolling_dfs @@ -285,8 +383,15 @@ def test_range_rolling_unsupported_index_type_raise_error(range_rolling_dfs): bf_df["int_col"].sort_index().rolling(window="3s") +def test_range_rolling_unsorted_index_raise_error(range_rolling_dfs): + bf_df, _ = range_rolling_dfs + + with pytest.raises(ValueError): + bf_df.set_index("ts_col")["int_col"].rolling(window="3s") + + def test_range_rolling_unsorted_column_raise_error(range_rolling_dfs): bf_df, _ = range_rolling_dfs with pytest.raises(ValueError): - bf_df["int_col"].rolling(window="3s") + bf_df.rolling(window="3s", on="ts_col") diff --git a/third_party/bigframes_vendored/pandas/core/generic.py b/third_party/bigframes_vendored/pandas/core/generic.py index ee2400dfd8..4c9d1338f4 100644 --- a/third_party/bigframes_vendored/pandas/core/generic.py +++ b/third_party/bigframes_vendored/pandas/core/generic.py @@ -1083,12 +1083,17 @@ def rolling( [4 rows x 2 columns] Args: - window (int): + window (int, pandas.Timedelta, numpy.timedelta64, datetime.timedelta, str): Size of the moving window. If an integer, the fixed number of observations used for each window. + If a string, the timedelta representation in string. This string + must be parsable by pandas.Timedelta(). + + Otherwise, the time range for each window. + min_periods (int, default None): Minimum number of observations in window required to have a value; otherwise, result is ``np.nan``. @@ -1096,6 +1101,9 @@ def rolling( For a window that is specified by an integer, ``min_periods`` will default to the size of the window. + For a window that is not spicified by an interger, ``min_periods`` will default + to 1. + on (str, optional): For a DataFrame, a column label on which to calculate the rolling window, rather than the DataFrame’s index. diff --git a/third_party/bigframes_vendored/pandas/core/groupby/__init__.py b/third_party/bigframes_vendored/pandas/core/groupby/__init__.py index 1b5bd308e3..4fb8498932 100644 --- a/third_party/bigframes_vendored/pandas/core/groupby/__init__.py +++ b/third_party/bigframes_vendored/pandas/core/groupby/__init__.py @@ -1025,16 +1025,27 @@ def rolling(self, *args, **kwargs): dtype: Int64 Args: + window (int, pandas.Timedelta, numpy.timedelta64, datetime.timedelta, str): + Size of the moving window. + + If an integer, the fixed number of observations used for + each window. + + If a string, the timedelta representation in string. This string + must be parsable by pandas.Timedelta(). + + Otherwise, the time range for each window. + min_periods (int, default None): Minimum number of observations in window required to have a value; otherwise, result is ``np.nan``. - For a window that is specified by an offset, - ``min_periods`` will default to 1. - For a window that is specified by an integer, ``min_periods`` will default to the size of the window. + For a window that is not spicified by an interger, ``min_periods`` will default + to 1. + on (str, optional): For a DataFrame, a column label on which to calculate the rolling window, rather than the DataFrame’s index.