From da0372e908b0f4b46b5f954960e5a71631a25771 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Thu, 3 Apr 2025 21:27:12 +0000 Subject: [PATCH 01/15] [WIP] implement range rolling with mods at expression level --- bigframes/core/compile/compiled.py | 36 ++++++++----- bigframes/core/window/rolling.py | 82 +++++++++++++++++++++++++++++- bigframes/core/window_spec.py | 45 ++++++++++++++-- bigframes/series.py | 23 ++++++--- 4 files changed, 161 insertions(+), 25 deletions(-) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 4443c495d7..d1e878505d 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -463,9 +463,6 @@ def project_window_op( never_skip_nulls=never_skip_nulls, ) - if expression.op.order_independent and not window_spec.row_bounded: - # notably percentile_cont does not support ordering clause - window_spec = window_spec.without_order() window = self._ibis_window_from_spec(window_spec) bindings = {col: self._get_ibis_column(col) for col in self.column_ids} @@ -541,17 +538,30 @@ def _ibis_window_from_spec(self, window_spec: WindowSpec): # 1. Order-independent op (aggregation, cut, rank) with unbound window - no ordering clause needed # 2. Order-independent op (aggregation, cut, rank) with range window - use ordering clause, ties allowed # 3. Order-depedenpent op (navigation functions, array_agg) or rows bounds - use total row order to break ties. - if window_spec.ordering: + if window_spec.row_bounded: + if len(window_spec.ordering) == 0: + raise ValueError("No ordering provided for ordered analytic function") order_by = _convert_ordering_to_table_values( - self._column_names, - window_spec.ordering, + self._column_names, window_spec.ordering ) - elif window_spec.row_bounded: - # If window spec has following or preceding bounds, we need to apply an unambiguous ordering. - raise ValueError("No ordering provided for ordered analytic function") else: - # Unbound grouping window. Suitable for aggregations but not for analytic function application. - order_by = None + # Range rolling + ordering_col = window_spec.ordering[0] + ibis_ref = op_compiler.compile_expression( + ordering_col.scalar_expression, self._column_names + ) + # Decorate the rolling column for time-range rolling + if ibis_ref.type().timezone is None: + # We cannot directly use UNIX_MICROS on DATETIME, so we cast the column + # to TIMESTAMP(tz=UTC) + ibis_ref = ibis_ref.cast(ibis_dtypes.Timestamp(timezone="UTC")) + ibis_ref = scalar_op_compiler.unix_micros(ibis_ref) + + order_by = [ + bigframes_vendored.ibis.asc(ibis_ref) # type:ignore + if ordering_col.direction.is_ascending + else bigframes_vendored.ibis.desc(ibis_ref) # type:ignore + ] window = bigframes_vendored.ibis.window(order_by=order_by, group_by=group_by) if window_spec.bounds is not None: @@ -692,8 +702,8 @@ def _add_boundary( ) -> ibis_expr_builders.LegacyWindowBuilder: if isinstance(bounds, RangeWindowBounds): return ibis_window.range( - start=_to_ibis_boundary(bounds.start), - end=_to_ibis_boundary(bounds.end), + start=_to_ibis_boundary(bounds.start_micros), + end=_to_ibis_boundary(bounds.end_micros), ) if isinstance(bounds, RowsWindowBounds): if bounds.start is not None or bounds.end is not None: diff --git a/bigframes/core/window/rolling.py b/bigframes/core/window/rolling.py index b10b2da123..98a981f9ce 100644 --- a/bigframes/core/window/rolling.py +++ b/bigframes/core/window/rolling.py @@ -14,11 +14,17 @@ from __future__ import annotations +import datetime +from functools import singledispatch import typing import bigframes_vendored.pandas.core.window.rolling as vendored_pandas_rolling +import numpy +import pandas -from bigframes.core import log_adapter, window_spec +from bigframes import dtypes +from bigframes.core import expression as ex +from bigframes.core import log_adapter, nodes, ordering, window_spec import bigframes.core.blocks as blocks import bigframes.operations.aggregations as agg_ops @@ -118,3 +124,77 @@ def _aggregate_block( labels = [self._block.col_id_to_label[col] for col in agg_col_ids] return block.select_columns(result_ids).with_column_labels(labels) + + +def create_range_window( + block: blocks.Block, + window: pandas.Timedelta | numpy.timedelta64 | datetime.timedelta | str, + min_periods: int | None, + closed: typing.Literal["right", "left", "both", "neither"], + is_series: bool, +) -> Window: + + index_dtypes = block.index.dtypes + if len(index_dtypes) > 1: + raise ValueError("Range rolling on MultiIndex is not supported") + if index_dtypes[0] != dtypes.TIMESTAMP_DTYPE: + raise ValueError("Index type should be timestamps with timezones") + + order_direction = _find_ordering(block.expr.node, block.index_columns[0]) + if order_direction is None: + raise ValueError( + "The index might not be in a monotonic order. Please sort the index before rolling." + ) + if isinstance(window, str): + window = pandas.Timedelta(window) + spec = window_spec.WindowSpec( + bounds=window_spec.RangeWindowBounds.from_timedelta_window(window, closed), + min_periods=1 if min_periods is None else min_periods, + ordering=( + ordering.OrderingExpression( + ex.deref(block.index_columns[0]), order_direction + ), + ), + ) + return Window(block, spec, block.value_columns, is_series=is_series) + + +@singledispatch +def _find_ordering( + root: nodes.BigFrameNode, column_id: str +) -> ordering.OrderingDirection | None: + """Returns the order of the given column with tree traversal. If the column cannot be found, + or the ordering information is not available, return None. + """ + return None + + +@_find_ordering.register +def _(root: nodes.OrderByNode, column_id: str) -> ordering.OrderingDirection | None: + for order_expr in root.by: + scalar_expr = order_expr.scalar_expression + if isinstance(scalar_expr, ex.DerefOp) and scalar_expr.id.name == column_id: + return order_expr.direction + + return None + + +@_find_ordering.register +def _(root: nodes.ReversedNode, column_id: str): + direction = _find_ordering(root.child, column_id) + + if direction is None: + return None + return direction.reverse() + + +@_find_ordering.register +def _(root: nodes.SelectionNode, column_id: str): + for alias_ref in root.input_output_pairs: + if alias_ref.id.name == column_id: + return _find_ordering(root.child, alias_ref.ref.id.name) + + +@_find_ordering.register +def _(root: nodes.FilterNode, column_id: str): + return _find_ordering(root.child, column_id) diff --git a/bigframes/core/window_spec.py b/bigframes/core/window_spec.py index 10a5d9119c..ef54e64dc3 100644 --- a/bigframes/core/window_spec.py +++ b/bigframes/core/window_spec.py @@ -14,9 +14,14 @@ from __future__ import annotations from dataclasses import dataclass, replace +import datetime import itertools from typing import Literal, Mapping, Optional, Set, Tuple, Union +import numpy as np +import pandas as pd + +from bigframes.core import utils import bigframes.core.expression as ex import bigframes.core.identifiers as ids import bigframes.core.ordering as orderings @@ -168,9 +173,43 @@ def __post_init__(self): @dataclass(frozen=True) class RangeWindowBounds: - # TODO(b/388916840) Support range rolling on timeseries with timedeltas. - start: Optional[int] = None - end: Optional[int] = None + """Represents a time range window, inclusively bounded by start and end""" + + start: pd.Timedelta | None = None + end: pd.Timedelta | None = None + + @classmethod + def from_timedelta_window( + cls, + window: pd.Timedelta | np.timedelta64 | datetime.timedelta, + closed: Literal["right", "left", "both", "neither"], + ) -> RangeWindowBounds: + window = pd.Timedelta(window) + tick = pd.Timedelta("1us") + zero = pd.Timedelta(0) + + if closed == "right": + return cls(-(window - tick), zero) + elif closed == "left": + return cls(-window, -tick) + elif closed == "both": + return cls(-window, zero) + elif closed == "neither": + return cls(-(window - tick), -tick) + else: + raise ValueError(f"Unsupported value for 'closed' parameter: {closed}") + + @property + def start_micros(self) -> int | None: + if self.start is None: + return None + return utils.timedelta_to_micros(self.start) + + @property + def end_micros(self) -> int | None: + if self.end is None: + return None + return utils.timedelta_to_micros(self.end) def __post_init__(self): if self.start is None: diff --git a/bigframes/series.py b/bigframes/series.py index d2a3dcf78f..ddf3f6df78 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -46,6 +46,7 @@ import bigframes.core.utils as utils import bigframes.core.validations as validations import bigframes.core.window +from bigframes.core.window import rolling import bigframes.core.window_spec as windows import bigframes.dataframe import bigframes.dtypes @@ -1441,16 +1442,22 @@ def sort_index(self, *, axis=0, ascending=True, na_position="last") -> Series: @validations.requires_ordering() def rolling( self, - window: int, - min_periods=None, + window: int | pandas.Timedelta | numpy.timedelta64 | datetime.timedelta | str, + min_periods: int | None = None, closed: Literal["right", "left", "both", "neither"] = "right", ) -> bigframes.core.window.Window: - window_spec = windows.WindowSpec( - bounds=windows.RowsWindowBounds.from_window_size(window, closed), - min_periods=min_periods if min_periods is not None else window, - ) - return bigframes.core.window.Window( - self._block, window_spec, self._block.value_columns, is_series=True + if isinstance(window, int): + # Rows rolling + window_spec = windows.WindowSpec( + bounds=windows.RowsWindowBounds.from_window_size(window, closed), + min_periods=window if min_periods is None else min_periods, + ) + return bigframes.core.window.Window( + self._block, window_spec, self._block.value_columns, is_series=True + ) + + return rolling.create_range_window( + self._block, window, min_periods, closed, is_series=True ) @validations.requires_ordering() From d253ca75e5f82ebc1980fac4a678298c46a5c978 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Thu, 3 Apr 2025 23:32:32 +0000 Subject: [PATCH 02/15] implement range rolling for series --- bigframes/core/compile/compiled.py | 46 +++++++++++++++-------------- bigframes/core/compile/compiler.py | 1 + bigframes/core/rewrite/__init__.py | 2 ++ bigframes/core/rewrite/windows.py | 47 ++++++++++++++++++++++++++++++ 4 files changed, 74 insertions(+), 22 deletions(-) create mode 100644 bigframes/core/rewrite/windows.py diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index d1e878505d..f41f5d7882 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -231,7 +231,7 @@ def aggregate( col_out: agg_compiler.compile_aggregate( aggregate, bindings, - order_by=_convert_ordering_to_table_values(table, order_by), + order_by=_convert_row_ordering_to_table_values(table, order_by), ) for aggregate, col_out in aggregations } @@ -538,30 +538,19 @@ def _ibis_window_from_spec(self, window_spec: WindowSpec): # 1. Order-independent op (aggregation, cut, rank) with unbound window - no ordering clause needed # 2. Order-independent op (aggregation, cut, rank) with range window - use ordering clause, ties allowed # 3. Order-depedenpent op (navigation functions, array_agg) or rows bounds - use total row order to break ties. + if not window_spec.ordering: + # If window spec has following or preceding bounds, we need to apply an unambiguous ordering. + raise ValueError("No ordering provided for ordered analytic function") + if window_spec.row_bounded: - if len(window_spec.ordering) == 0: - raise ValueError("No ordering provided for ordered analytic function") - order_by = _convert_ordering_to_table_values( - self._column_names, window_spec.ordering + order_by = _convert_row_ordering_to_table_values( + self._column_names, + window_spec.ordering, ) else: - # Range rolling - ordering_col = window_spec.ordering[0] - ibis_ref = op_compiler.compile_expression( - ordering_col.scalar_expression, self._column_names + order_by = _convert_range_ordering_to_table_value( + self._column_names, window_spec.ordering[0] ) - # Decorate the rolling column for time-range rolling - if ibis_ref.type().timezone is None: - # We cannot directly use UNIX_MICROS on DATETIME, so we cast the column - # to TIMESTAMP(tz=UTC) - ibis_ref = ibis_ref.cast(ibis_dtypes.Timestamp(timezone="UTC")) - ibis_ref = scalar_op_compiler.unix_micros(ibis_ref) - - order_by = [ - bigframes_vendored.ibis.asc(ibis_ref) # type:ignore - if ordering_col.direction.is_ascending - else bigframes_vendored.ibis.desc(ibis_ref) # type:ignore - ] window = bigframes_vendored.ibis.window(order_by=order_by, group_by=group_by) if window_spec.bounds is not None: @@ -585,7 +574,7 @@ def is_window(column: ibis_types.Value) -> bool: return any(isinstance(op, ibis_ops.WindowFunction) for op in matches) -def _convert_ordering_to_table_values( +def _convert_row_ordering_to_table_values( value_lookup: typing.Mapping[str, ibis_types.Value], ordering_columns: typing.Sequence[OrderingExpression], ) -> typing.Sequence[ibis_types.Value]: @@ -613,6 +602,19 @@ def _convert_ordering_to_table_values( return ordering_values +def _convert_range_ordering_to_table_value( + value_lookup: typing.Mapping[str, ibis_types.Value], + ordering_column: OrderingExpression, +) -> ibis_types.Value: + expr = op_compiler.compile_expression( + ordering_column.scalar_expression, value_lookup + ) + + if ordering_column.direction.is_ascending: + return bigframes_vendored.ibis.asc(expr) + return bigframes_vendored.ibis.desc(expr) + + def _string_cast_join_cond( lvalue: ibis_types.Column, rvalue: ibis_types.Column ) -> ibis_types.BooleanColumn: diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 3d9bf19f76..167e4b7f75 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -86,6 +86,7 @@ def _replace_unsupported_ops(node: nodes.BigFrameNode): # TODO: Run all replacement rules as single bottom-up pass node = nodes.bottom_up(node, rewrites.rewrite_slice) node = nodes.bottom_up(node, rewrites.rewrite_timedelta_expressions) + node = nodes.bottom_up(node, rewrites.rewrite_range_rolling) return node diff --git a/bigframes/core/rewrite/__init__.py b/bigframes/core/rewrite/__init__.py index e5f7578911..58730805e4 100644 --- a/bigframes/core/rewrite/__init__.py +++ b/bigframes/core/rewrite/__init__.py @@ -19,6 +19,7 @@ from bigframes.core.rewrite.pruning import column_pruning from bigframes.core.rewrite.slices import pullup_limit_from_slice, rewrite_slice from bigframes.core.rewrite.timedeltas import rewrite_timedelta_expressions +from bigframes.core.rewrite.windows import rewrite_range_rolling __all__ = [ "legacy_join_as_projection", @@ -29,4 +30,5 @@ "remap_variables", "pull_up_order", "column_pruning", + "rewrite_range_rolling", ] diff --git a/bigframes/core/rewrite/windows.py b/bigframes/core/rewrite/windows.py new file mode 100644 index 0000000000..d696acb808 --- /dev/null +++ b/bigframes/core/rewrite/windows.py @@ -0,0 +1,47 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import dataclasses + +from bigframes import dtypes +from bigframes import operations as ops +from bigframes.core import nodes + + +def rewrite_range_rolling(root: nodes.BigFrameNode) -> nodes.BigFrameNode: + if isinstance(root, nodes.WindowOpNode): + return _rewrite_range_rolling_node(root) + + return root + + +def _rewrite_range_rolling_node(node: nodes.WindowOpNode) -> nodes.BigFrameNode: + if len(node.window_spec.ordering) != 1: + raise ValueError( + "Range rolling should only be performed on exactly one column." + ) + + ordering_expr = node.window_spec.ordering[0] + + new_ordering = dataclasses.replace( + ordering_expr, + scalar_expression=ops.UnixMicros().as_expr(ordering_expr.scalar_expression), + ) + + return dataclasses.replace( + node, + window_spec=dataclasses.replace(node.window_spec, ordering=(new_ordering,)), + ) From e873c07d20e6f0de5ce15cca35ac4cb28cb44877 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Thu, 3 Apr 2025 23:34:55 +0000 Subject: [PATCH 03/15] fix lint --- bigframes/core/rewrite/windows.py | 1 - tests/system/small/test_window.py | 123 +++++++++++++++++++++++++----- 2 files changed, 103 insertions(+), 21 deletions(-) diff --git a/bigframes/core/rewrite/windows.py b/bigframes/core/rewrite/windows.py index d696acb808..ffce7eb6be 100644 --- a/bigframes/core/rewrite/windows.py +++ b/bigframes/core/rewrite/windows.py @@ -16,7 +16,6 @@ import dataclasses -from bigframes import dtypes from bigframes import operations as ops from bigframes.core import nodes diff --git a/tests/system/small/test_window.py b/tests/system/small/test_window.py index bfe97f5103..ed90b632d4 100644 --- a/tests/system/small/test_window.py +++ b/tests/system/small/test_window.py @@ -12,12 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime + +import numpy as np import pandas as pd import pytest @pytest.fixture(scope="module") -def rolling_dfs(scalars_dfs): +def rows_rolling_dfs(scalars_dfs): bf_df, pd_df = scalars_dfs target_cols = ["int64_too", "float64_col", "int64_col"] @@ -26,7 +29,23 @@ def rolling_dfs(scalars_dfs): @pytest.fixture(scope="module") -def rolling_series(scalars_dfs): +def range_rolling_dfs(session): + pd_df = pd.DataFrame( + { + "ts_col": pd.Timestamp("20250101", tz="UTC") + + pd.to_timedelta(np.arange(10), "s"), + "dt_col": pd.Timestamp("20250101") + pd.to_timedelta(np.arange(10), "s"), + "int_col": np.arange(10), + } + ) + + bf_df = session.read_pandas(pd_df) + + return bf_df, pd_df + + +@pytest.fixture(scope="module") +def rows_rolling_series(scalars_dfs): bf_df, pd_df = scalars_dfs target_col = "int64_too" @@ -34,8 +53,8 @@ def rolling_series(scalars_dfs): @pytest.mark.parametrize("closed", ["left", "right", "both", "neither"]) -def test_dataframe_rolling_closed_param(rolling_dfs, closed): - bf_df, pd_df = rolling_dfs +def test_dataframe_rolling_closed_param(rows_rolling_dfs, closed): + bf_df, pd_df = rows_rolling_dfs actual_result = bf_df.rolling(window=3, closed=closed).sum().to_pandas() @@ -44,8 +63,8 @@ def test_dataframe_rolling_closed_param(rolling_dfs, closed): @pytest.mark.parametrize("closed", ["left", "right", "both", "neither"]) -def test_dataframe_groupby_rolling_closed_param(rolling_dfs, closed): - bf_df, pd_df = rolling_dfs +def test_dataframe_groupby_rolling_closed_param(rows_rolling_dfs, closed): + bf_df, pd_df = rows_rolling_dfs # Need to specify column subset for comparison due to b/406841327 check_columns = ["float64_col", "int64_col"] @@ -64,8 +83,8 @@ def test_dataframe_groupby_rolling_closed_param(rolling_dfs, closed): ) -def test_dataframe_rolling_on(rolling_dfs): - bf_df, pd_df = rolling_dfs +def test_dataframe_rolling_on(rows_rolling_dfs): + bf_df, pd_df = rows_rolling_dfs actual_result = bf_df.rolling(window=3, on="int64_too").sum().to_pandas() @@ -73,15 +92,15 @@ def test_dataframe_rolling_on(rolling_dfs): pd.testing.assert_frame_equal(actual_result, expected_result, check_dtype=False) -def test_dataframe_rolling_on_invalid_column_raise_error(rolling_dfs): - bf_df, _ = rolling_dfs +def test_dataframe_rolling_on_invalid_column_raise_error(rows_rolling_dfs): + bf_df, _ = rows_rolling_dfs with pytest.raises(ValueError): bf_df.rolling(window=3, on="whatever").sum() -def test_dataframe_groupby_rolling_on(rolling_dfs): - bf_df, pd_df = rolling_dfs +def test_dataframe_groupby_rolling_on(rows_rolling_dfs): + bf_df, pd_df = rows_rolling_dfs # Need to specify column subset for comparison due to b/406841327 check_columns = ["float64_col", "int64_col"] @@ -100,16 +119,16 @@ def test_dataframe_groupby_rolling_on(rolling_dfs): ) -def test_dataframe_groupby_rolling_on_invalid_column_raise_error(rolling_dfs): - bf_df, _ = rolling_dfs +def test_dataframe_groupby_rolling_on_invalid_column_raise_error(rows_rolling_dfs): + bf_df, _ = rows_rolling_dfs with pytest.raises(ValueError): bf_df.groupby(level=0).rolling(window=3, on="whatever").sum() @pytest.mark.parametrize("closed", ["left", "right", "both", "neither"]) -def test_series_rolling_closed_param(rolling_series, closed): - bf_series, df_series = rolling_series +def test_series_rolling_closed_param(rows_rolling_series, closed): + bf_series, df_series = rows_rolling_series actual_result = bf_series.rolling(window=3, closed=closed).sum().to_pandas() @@ -118,8 +137,8 @@ def test_series_rolling_closed_param(rolling_series, closed): @pytest.mark.parametrize("closed", ["left", "right", "both", "neither"]) -def test_series_groupby_rolling_closed_param(rolling_series, closed): - bf_series, df_series = rolling_series +def test_series_groupby_rolling_closed_param(rows_rolling_series, closed): + bf_series, df_series = rows_rolling_series actual_result = ( bf_series.groupby(bf_series % 2) @@ -159,8 +178,8 @@ def test_series_groupby_rolling_closed_param(rolling_series, closed): pytest.param(lambda x: x.var(), id="var"), ], ) -def test_series_window_agg_ops(rolling_series, windowing, agg_op): - bf_series, pd_series = rolling_series +def test_series_window_agg_ops(rows_rolling_series, windowing, agg_op): + bf_series, pd_series = rows_rolling_series actual_result = agg_op(windowing(bf_series)).to_pandas() @@ -205,3 +224,67 @@ def test_dataframe_window_agg_ops(scalars_dfs, windowing, agg_op): pd_result = agg_op(windowing(pd_df)) pd.testing.assert_frame_equal(pd_result, bf_result, check_dtype=False) + + +@pytest.mark.parametrize("closed", ["left", "right", "both", "neither"]) +@pytest.mark.parametrize( + "window", # skipped numpy because Pandas does not support it. + [pd.Timedelta("3s"), datetime.timedelta(seconds=3), "3s"], +) +@pytest.mark.parametrize("ascending", [True, False]) +def test_series_range_rolling(range_rolling_dfs, window, closed, ascending): + bf_df, pd_df = range_rolling_dfs + bf_series = bf_df.set_index("ts_col")["int_col"] + pd_series = pd_df.set_index("ts_col")["int_col"] + + actual_result = ( + bf_series.sort_index(ascending=ascending) + .rolling(window=window, closed=closed) + .min() + .to_pandas() + ) + + expected_result = ( + pd_series.sort_index(ascending=ascending) + .rolling(window=window, closed=closed) + .min() + ) + pd.testing.assert_series_equal( + actual_result, expected_result, check_dtype=False, check_index=False + ) + + +def test_range_rolling_order_info_lookup(range_rolling_dfs): + bf_df, pd_df = range_rolling_dfs + + actual_result = ( + bf_df.set_index("ts_col") + .sort_index(ascending=False)["int_col"] + .rolling(window="3s") + .min() + .to_pandas() + ) + + expected_result = ( + pd_df.set_index("ts_col") + .sort_index(ascending=False)["int_col"] + .rolling(window="3s") + .min() + ) + pd.testing.assert_series_equal( + actual_result, expected_result, check_dtype=False, check_index=False + ) + + +def test_range_rolling_unsupported_index_type_raise_error(range_rolling_dfs): + bf_df, _ = range_rolling_dfs + + with pytest.raises(ValueError): + bf_df["int_col"].sort_index().rolling(window="3s") + + +def test_range_rolling_unsorted_column_raise_error(range_rolling_dfs): + bf_df, _ = range_rolling_dfs + + with pytest.raises(ValueError): + bf_df["int_col"].rolling(window="3s") From 6222c5e3f7439f89ce17acedea0fc5ca3416fd75 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Fri, 4 Apr 2025 00:37:06 +0000 Subject: [PATCH 04/15] update rewrite logic --- bigframes/core/rewrite/windows.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/bigframes/core/rewrite/windows.py b/bigframes/core/rewrite/windows.py index ffce7eb6be..99208e6686 100644 --- a/bigframes/core/rewrite/windows.py +++ b/bigframes/core/rewrite/windows.py @@ -20,14 +20,13 @@ from bigframes.core import nodes -def rewrite_range_rolling(root: nodes.BigFrameNode) -> nodes.BigFrameNode: - if isinstance(root, nodes.WindowOpNode): - return _rewrite_range_rolling_node(root) +def rewrite_range_rolling(node: nodes.BigFrameNode) -> nodes.BigFrameNode: + if not isinstance(node, nodes.WindowOpNode): + return node - return root + if node.window_spec.row_bounded: + return node - -def _rewrite_range_rolling_node(node: nodes.WindowOpNode) -> nodes.BigFrameNode: if len(node.window_spec.ordering) != 1: raise ValueError( "Range rolling should only be performed on exactly one column." From 850ea41b686089ab77b2ddc391a6e46649d8b49e Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Fri, 4 Apr 2025 00:48:51 +0000 Subject: [PATCH 05/15] fix mypy --- bigframes/core/compile/compiled.py | 12 +++++++----- bigframes/core/compile/polars/compiler.py | 7 +++---- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index f41f5d7882..90a21c23d8 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -548,9 +548,11 @@ def _ibis_window_from_spec(self, window_spec: WindowSpec): window_spec.ordering, ) else: - order_by = _convert_range_ordering_to_table_value( - self._column_names, window_spec.ordering[0] - ) + order_by = [ + _convert_range_ordering_to_table_value( + self._column_names, window_spec.ordering[0] + ) + ] window = bigframes_vendored.ibis.window(order_by=order_by, group_by=group_by) if window_spec.bounds is not None: @@ -611,8 +613,8 @@ def _convert_range_ordering_to_table_value( ) if ordering_column.direction.is_ascending: - return bigframes_vendored.ibis.asc(expr) - return bigframes_vendored.ibis.desc(expr) + return bigframes_vendored.ibis.asc(expr) # type: ignore + return bigframes_vendored.ibis.desc(expr) # type: ignore def _string_cast_join_cond( diff --git a/bigframes/core/compile/polars/compiler.py b/bigframes/core/compile/polars/compiler.py index 6fac3c9b92..cd3343bf90 100644 --- a/bigframes/core/compile/polars/compiler.py +++ b/bigframes/core/compile/polars/compiler.py @@ -16,7 +16,7 @@ import dataclasses import functools import itertools -from typing import cast, Optional, Sequence, Tuple, TYPE_CHECKING, Union +from typing import cast, Optional, Sequence, Tuple, TYPE_CHECKING import bigframes.core from bigframes.core import window_spec @@ -360,6 +360,7 @@ def compile_window(self, node: nodes.WindowOpNode): return df.with_columns([agg_expr]) else: # row-bounded window + assert isinstance(window.bounds, window_spec.RowsWindowBounds) # Polars API semi-bounded, and any grouped rolling window challenging # https://github.com/pola-rs/polars/issues/4799 # https://github.com/pola-rs/polars/issues/8976 @@ -383,9 +384,7 @@ def compile_window(self, node: nodes.WindowOpNode): return pl.concat([df, results], how="horizontal") -def _get_period( - bounds: Union[window_spec.RowsWindowBounds, window_spec.RangeWindowBounds] -) -> Optional[int]: +def _get_period(bounds: window_spec.RowsWindowBounds) -> Optional[int]: """Returns None if the boundary is infinite.""" if bounds.start is None or bounds.end is None: return None From db6a3530a493eb91e4cae7806ccbda6214f00fcc Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Fri, 4 Apr 2025 03:18:39 +0000 Subject: [PATCH 06/15] differentiate range bound and unbounded window --- bigframes/core/array_value.py | 2 +- bigframes/core/compile/compiled.py | 29 +++++++++++++++++++--------- bigframes/core/nodes.py | 4 ++-- bigframes/core/rewrite/windows.py | 2 +- bigframes/core/window/rolling.py | 31 +++++++++++++++++------------- bigframes/core/window_spec.py | 24 +++++++++++++++++++++-- tests/system/small/test_window.py | 2 +- 7 files changed, 65 insertions(+), 29 deletions(-) diff --git a/bigframes/core/array_value.py b/bigframes/core/array_value.py index 7ede7b7e65..b40f4d2f6b 100644 --- a/bigframes/core/array_value.py +++ b/bigframes/core/array_value.py @@ -412,7 +412,7 @@ def project_window_op( skip_reproject_unsafe: skips the reprojection step, can be used when performing many non-dependent window operations, user responsible for not nesting window expressions, or using outputs as join, filter or aggregation keys before a reprojection """ # TODO: Support non-deterministic windowing - if window_spec.row_bounded or not op.order_independent: + if window_spec.is_row_bounded or not op.order_independent: if self.node.order_ambiguous and not self.session._strictly_ordered: if not self.session._allows_ambiguity: raise ValueError( diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 90a21c23d8..d00ed20cf5 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -463,6 +463,9 @@ def project_window_op( never_skip_nulls=never_skip_nulls, ) + if expression.op.order_independent and window_spec.is_unbounded: + # notably percentile_cont does not support ordering clause + window_spec = window_spec.without_order() window = self._ibis_window_from_spec(window_spec) bindings = {col: self._get_ibis_column(col) for col in self.column_ids} @@ -538,21 +541,29 @@ def _ibis_window_from_spec(self, window_spec: WindowSpec): # 1. Order-independent op (aggregation, cut, rank) with unbound window - no ordering clause needed # 2. Order-independent op (aggregation, cut, rank) with range window - use ordering clause, ties allowed # 3. Order-depedenpent op (navigation functions, array_agg) or rows bounds - use total row order to break ties. - if not window_spec.ordering: - # If window spec has following or preceding bounds, we need to apply an unambiguous ordering. - raise ValueError("No ordering provided for ordered analytic function") + if window_spec.is_row_bounded: + if not window_spec.ordering: + # If window spec has following or preceding bounds, we need to apply an unambiguous ordering. + raise ValueError("No ordering provided for ordered analytic function") + order_by = _convert_row_ordering_to_table_values( + self._column_names, + window_spec.ordering, + ) - if window_spec.row_bounded: + elif window_spec.is_range_bounded: + order_by = _convert_range_ordering_to_table_value( + self._column_names, + window_spec.ordering[0], + ) + # The rest if branches are for unbounded windows + elif window_spec.ordering: + # Unbound grouping window. Suitable for aggregations but not for analytic function application. order_by = _convert_row_ordering_to_table_values( self._column_names, window_spec.ordering, ) else: - order_by = [ - _convert_range_ordering_to_table_value( - self._column_names, window_spec.ordering[0] - ) - ] + order_by = None window = bigframes_vendored.ibis.window(order_by=order_by, group_by=group_by) if window_spec.bounds is not None: diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index fbc43e033a..56cc23b133 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -1358,7 +1358,7 @@ def _validate(self): """Validate the local data in the node.""" # Since inner order and row bounds are coupled, rank ops can't be row bounded assert ( - not self.window_spec.row_bounded + not self.window_spec.is_row_bounded ) or self.expression.op.implicitly_inherits_order assert all(ref in self.child.ids for ref in self.expression.column_references) @@ -1420,7 +1420,7 @@ def inherits_order(self) -> bool: op_inherits_order = ( not self.expression.op.order_independent ) and self.expression.op.implicitly_inherits_order - return op_inherits_order or self.window_spec.row_bounded + return op_inherits_order or self.window_spec.is_row_bounded @property def additive_base(self) -> BigFrameNode: diff --git a/bigframes/core/rewrite/windows.py b/bigframes/core/rewrite/windows.py index 99208e6686..9f55db23af 100644 --- a/bigframes/core/rewrite/windows.py +++ b/bigframes/core/rewrite/windows.py @@ -24,7 +24,7 @@ def rewrite_range_rolling(node: nodes.BigFrameNode) -> nodes.BigFrameNode: if not isinstance(node, nodes.WindowOpNode): return node - if node.window_spec.row_bounded: + if not node.window_spec.is_range_bounded: return node if len(node.window_spec.ordering) != 1: diff --git a/bigframes/core/window/rolling.py b/bigframes/core/window/rolling.py index 01a3123288..3f7ee0e7c5 100644 --- a/bigframes/core/window/rolling.py +++ b/bigframes/core/window/rolling.py @@ -140,7 +140,7 @@ def create_range_window( if index_dtypes[0] != dtypes.TIMESTAMP_DTYPE: raise ValueError("Index type should be timestamps with timezones") - order_direction = _find_ordering(block.expr.node, block.index_columns[0]) + order_direction = _find_order_direction(block.expr.node, block.index_columns[0]) if order_direction is None: raise ValueError( "The index might not be in a monotonic order. Please sort the index before rolling." @@ -160,7 +160,7 @@ def create_range_window( @singledispatch -def _find_ordering( +def _find_order_direction( root: nodes.BigFrameNode, column_id: str ) -> ordering.OrderingDirection | None: """Returns the order of the given column with tree traversal. If the column cannot be found, @@ -169,32 +169,37 @@ def _find_ordering( return None -@_find_ordering.register +@_find_order_direction.register def _(root: nodes.OrderByNode, column_id: str): - for order_expr in root.by: - scalar_expr = order_expr.scalar_expression - if isinstance(scalar_expr, ex.DerefOp) and scalar_expr.id.name == column_id: - return order_expr.direction + if len(root.by) == 0: + return None + + # Only when the column is used as the first ordering key + # does it guarantee that its values are in a monotonic order. + order_expr = root.by[0] + scalar_expr = order_expr.scalar_expression + if isinstance(scalar_expr, ex.DerefOp) and scalar_expr.id.name == column_id: + return order_expr.direction return None -@_find_ordering.register +@_find_order_direction.register def _(root: nodes.ReversedNode, column_id: str): - direction = _find_ordering(root.child, column_id) + direction = _find_order_direction(root.child, column_id) if direction is None: return None return direction.reverse() -@_find_ordering.register +@_find_order_direction.register def _(root: nodes.SelectionNode, column_id: str): for alias_ref in root.input_output_pairs: if alias_ref.id.name == column_id: - return _find_ordering(root.child, alias_ref.ref.id.name) + return _find_order_direction(root.child, alias_ref.ref.id.name) -@_find_ordering.register +@_find_order_direction.register def _(root: nodes.FilterNode, column_id: str): - return _find_ordering(root.child, column_id) + return _find_order_direction(root.child, column_id) diff --git a/bigframes/core/window_spec.py b/bigframes/core/window_spec.py index ef54e64dc3..169f9f2705 100644 --- a/bigframes/core/window_spec.py +++ b/bigframes/core/window_spec.py @@ -240,7 +240,7 @@ class WindowSpec: min_periods: int = 0 @property - def row_bounded(self): + def is_row_bounded(self): """ Whether the window is bounded by row offsets. @@ -249,6 +249,26 @@ def row_bounded(self): """ return isinstance(self.bounds, RowsWindowBounds) + @property + def is_range_bounded(self): + """ + Whether the window is bounded by range offsets. + + This is relevant for determining whether the window requires a total order + to calculate deterministically. + """ + return isinstance(self.bounds, RangeWindowBounds) + + @property + def is_unbounded(self): + """ + Whether the window is unbounded. + + This is relevant for determining whether the window requires a total order + to calculate deterministically. + """ + return self.bounds is None + @property def all_referenced_columns(self) -> Set[ids.ColumnId]: """ @@ -261,7 +281,7 @@ def all_referenced_columns(self) -> Set[ids.ColumnId]: def without_order(self) -> WindowSpec: """Removes ordering clause if ordering isn't required to define bounds.""" - if self.row_bounded: + if self.is_row_bounded: raise ValueError("Cannot remove order from row-bounded window") return replace(self, ordering=()) diff --git a/tests/system/small/test_window.py b/tests/system/small/test_window.py index ed90b632d4..1502a20ef8 100644 --- a/tests/system/small/test_window.py +++ b/tests/system/small/test_window.py @@ -228,7 +228,7 @@ def test_dataframe_window_agg_ops(scalars_dfs, windowing, agg_op): @pytest.mark.parametrize("closed", ["left", "right", "both", "neither"]) @pytest.mark.parametrize( - "window", # skipped numpy because Pandas does not support it. + "window", # skipped numpy timedelta because Pandas does not support it. [pd.Timedelta("3s"), datetime.timedelta(seconds=3), "3s"], ) @pytest.mark.parametrize("ascending", [True, False]) From 1928992ddb854e4836bfb4d1f9f2a2fbb2e8413c Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Fri, 4 Apr 2025 03:27:43 +0000 Subject: [PATCH 07/15] fix mypy --- bigframes/core/compile/compiled.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index d00ed20cf5..6af116942d 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -551,10 +551,12 @@ def _ibis_window_from_spec(self, window_spec: WindowSpec): ) elif window_spec.is_range_bounded: - order_by = _convert_range_ordering_to_table_value( - self._column_names, - window_spec.ordering[0], - ) + order_by = [ + _convert_range_ordering_to_table_value( + self._column_names, + window_spec.ordering[0], + ) + ] # The rest if branches are for unbounded windows elif window_spec.ordering: # Unbound grouping window. Suitable for aggregations but not for analytic function application. From ad53e3dfaeb41efb52d8d17fb474c6542828b293 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Mon, 7 Apr 2025 17:22:06 +0000 Subject: [PATCH 08/15] relax sorting key search algo --- bigframes/core/window/rolling.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/bigframes/core/window/rolling.py b/bigframes/core/window/rolling.py index 3f7ee0e7c5..c73f2d07d7 100644 --- a/bigframes/core/window/rolling.py +++ b/bigframes/core/window/rolling.py @@ -174,12 +174,10 @@ def _(root: nodes.OrderByNode, column_id: str): if len(root.by) == 0: return None - # Only when the column is used as the first ordering key - # does it guarantee that its values are in a monotonic order. - order_expr = root.by[0] - scalar_expr = order_expr.scalar_expression - if isinstance(scalar_expr, ex.DerefOp) and scalar_expr.id.name == column_id: - return order_expr.direction + for order_expr in root.by: + scalar_expr = order_expr.scalar_expression + if isinstance(scalar_expr, ex.DerefOp) and scalar_expr.id.name == column_id: + return order_expr.direction return None From a2428247b92bbf32985cb4235096fbd5e7f90ae1 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Mon, 7 Apr 2025 19:14:48 +0000 Subject: [PATCH 09/15] check whether window key is prefix --- bigframes/core/window/rolling.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/bigframes/core/window/rolling.py b/bigframes/core/window/rolling.py index c73f2d07d7..9594d6816b 100644 --- a/bigframes/core/window/rolling.py +++ b/bigframes/core/window/rolling.py @@ -174,10 +174,11 @@ def _(root: nodes.OrderByNode, column_id: str): if len(root.by) == 0: return None - for order_expr in root.by: - scalar_expr = order_expr.scalar_expression - if isinstance(scalar_expr, ex.DerefOp) and scalar_expr.id.name == column_id: - return order_expr.direction + # Make sure the window key is the prefix of sorting keys. + order_expr = root.by[0] + scalar_expr = order_expr.scalar_expression + if isinstance(scalar_expr, ex.DerefOp) and scalar_expr.id.name == column_id: + return order_expr.direction return None From 2e86a47f866fcf22a90bf7ff69ef48fbb6a59011 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Mon, 7 Apr 2025 22:57:10 +0000 Subject: [PATCH 10/15] address comments --- bigframes/core/compile/compiled.py | 22 ++++++++- bigframes/core/window/ordering.py | 74 ++++++++++++++++++++++++++++++ bigframes/core/window/rolling.py | 53 ++------------------- bigframes/core/window_spec.py | 13 ------ tests/system/small/test_window.py | 6 ++- 5 files changed, 103 insertions(+), 65 deletions(-) create mode 100644 bigframes/core/window/ordering.py diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 1398cea8eb..6202a34ce2 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -29,6 +29,7 @@ from google.cloud import bigquery import pyarrow as pa +from bigframes.core import utils import bigframes.core.compile.aggregate_compiler as agg_compiler import bigframes.core.compile.googlesql import bigframes.core.compile.ibis_types @@ -597,6 +598,17 @@ def _convert_range_ordering_to_table_value( value_lookup: typing.Mapping[str, ibis_types.Value], ordering_column: OrderingExpression, ) -> ibis_types.Value: + """Converts the ordering for range windows to Ibis references. + + Note that this method is different from `_convert_row_ordering_to_table_values` in + that it does not arrange null values. There are two reasons: + 1. Manipulating null positions requires more than one ordering key, which is forbidden + by SQL window syntax for range rolling. + 2. Pandas does not allow range rolling on timeseries with nulls. + + Therefore, we opt for the simplest approach here: generate the simplest SQL and follow + the BigQuery engine behavior. + """ expr = op_compiler.compile_expression( ordering_column.scalar_expression, value_lookup ) @@ -695,8 +707,14 @@ def _add_boundary( ) -> ibis_expr_builders.LegacyWindowBuilder: if isinstance(bounds, RangeWindowBounds): return ibis_window.range( - start=_to_ibis_boundary(bounds.start_micros), - end=_to_ibis_boundary(bounds.end_micros), + start=_to_ibis_boundary( + None + if bounds.start is None + else utils.timedelta_to_micros(bounds.start) + ), + end=_to_ibis_boundary( + None if bounds.end is None else utils.timedelta_to_micros(bounds.end) + ), ) if isinstance(bounds, RowsWindowBounds): if bounds.start is not None or bounds.end is not None: diff --git a/bigframes/core/window/ordering.py b/bigframes/core/window/ordering.py new file mode 100644 index 0000000000..dadb4ee9b9 --- /dev/null +++ b/bigframes/core/window/ordering.py @@ -0,0 +1,74 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from functools import singledispatch + +from bigframes.core import expression as ex +from bigframes.core import nodes, ordering + + +@singledispatch +def find_order_direction( + root: nodes.BigFrameNode, column_id: str +) -> ordering.OrderingDirection | None: + """Returns the order of the given column with tree traversal. If the column cannot be found, + or the ordering information is not available, return None. + """ + return None + + +@find_order_direction.register +def _(root: nodes.OrderByNode, column_id: str): + if len(root.by) == 0: + # This is a no-op + return find_order_direction(root.child, column_id) + + # Make sure the window key is the prefix of sorting keys. + order_expr = root.by[0] + scalar_expr = order_expr.scalar_expression + if isinstance(scalar_expr, ex.DerefOp) and scalar_expr.id.name == column_id: + return order_expr.direction + + return None + + +@find_order_direction.register +def _(root: nodes.ReversedNode, column_id: str): + direction = find_order_direction(root.child, column_id) + + if direction is None: + return None + return direction.reverse() + + +@find_order_direction.register +def _(root: nodes.SelectionNode, column_id: str): + for alias_ref in root.input_output_pairs: + if alias_ref.id.name == column_id: + return find_order_direction(root.child, alias_ref.ref.id.name) + + +@find_order_direction.register +def _(root: nodes.FilterNode, column_id: str): + return find_order_direction(root.child, column_id) + + +@find_order_direction.register +def _(root: nodes.InNode, column_id: str): + return find_order_direction(root.left_child, column_id) + + +@find_order_direction.register +def _(root: nodes.WindowOpNode, column_id: str): + return find_order_direction(root.child, column_id) diff --git a/bigframes/core/window/rolling.py b/bigframes/core/window/rolling.py index 9594d6816b..b4636bfc8d 100644 --- a/bigframes/core/window/rolling.py +++ b/bigframes/core/window/rolling.py @@ -15,7 +15,6 @@ from __future__ import annotations import datetime -from functools import singledispatch import typing import bigframes_vendored.pandas.core.window.rolling as vendored_pandas_rolling @@ -24,8 +23,9 @@ from bigframes import dtypes from bigframes.core import expression as ex -from bigframes.core import log_adapter, nodes, ordering, window_spec +from bigframes.core import log_adapter, ordering, window_spec import bigframes.core.blocks as blocks +from bigframes.core.window import ordering as window_ordering import bigframes.operations.aggregations as agg_ops @@ -140,7 +140,9 @@ def create_range_window( if index_dtypes[0] != dtypes.TIMESTAMP_DTYPE: raise ValueError("Index type should be timestamps with timezones") - order_direction = _find_order_direction(block.expr.node, block.index_columns[0]) + order_direction = window_ordering.find_order_direction( + block.expr.node, block.index_columns[0] + ) if order_direction is None: raise ValueError( "The index might not be in a monotonic order. Please sort the index before rolling." @@ -157,48 +159,3 @@ def create_range_window( ), ) return Window(block, spec, block.value_columns, is_series=is_series) - - -@singledispatch -def _find_order_direction( - root: nodes.BigFrameNode, column_id: str -) -> ordering.OrderingDirection | None: - """Returns the order of the given column with tree traversal. If the column cannot be found, - or the ordering information is not available, return None. - """ - return None - - -@_find_order_direction.register -def _(root: nodes.OrderByNode, column_id: str): - if len(root.by) == 0: - return None - - # Make sure the window key is the prefix of sorting keys. - order_expr = root.by[0] - scalar_expr = order_expr.scalar_expression - if isinstance(scalar_expr, ex.DerefOp) and scalar_expr.id.name == column_id: - return order_expr.direction - - return None - - -@_find_order_direction.register -def _(root: nodes.ReversedNode, column_id: str): - direction = _find_order_direction(root.child, column_id) - - if direction is None: - return None - return direction.reverse() - - -@_find_order_direction.register -def _(root: nodes.SelectionNode, column_id: str): - for alias_ref in root.input_output_pairs: - if alias_ref.id.name == column_id: - return _find_order_direction(root.child, alias_ref.ref.id.name) - - -@_find_order_direction.register -def _(root: nodes.FilterNode, column_id: str): - return _find_order_direction(root.child, column_id) diff --git a/bigframes/core/window_spec.py b/bigframes/core/window_spec.py index 169f9f2705..d08ba3d12a 100644 --- a/bigframes/core/window_spec.py +++ b/bigframes/core/window_spec.py @@ -21,7 +21,6 @@ import numpy as np import pandas as pd -from bigframes.core import utils import bigframes.core.expression as ex import bigframes.core.identifiers as ids import bigframes.core.ordering as orderings @@ -199,18 +198,6 @@ def from_timedelta_window( else: raise ValueError(f"Unsupported value for 'closed' parameter: {closed}") - @property - def start_micros(self) -> int | None: - if self.start is None: - return None - return utils.timedelta_to_micros(self.start) - - @property - def end_micros(self) -> int | None: - if self.end is None: - return None - return utils.timedelta_to_micros(self.end) - def __post_init__(self): if self.start is None: return diff --git a/tests/system/small/test_window.py b/tests/system/small/test_window.py index 1502a20ef8..fe233c3a7a 100644 --- a/tests/system/small/test_window.py +++ b/tests/system/small/test_window.py @@ -260,16 +260,18 @@ def test_range_rolling_order_info_lookup(range_rolling_dfs): actual_result = ( bf_df.set_index("ts_col") .sort_index(ascending=False)["int_col"] + .isin(bf_df["int_col"]) .rolling(window="3s") - .min() + .count() .to_pandas() ) expected_result = ( pd_df.set_index("ts_col") .sort_index(ascending=False)["int_col"] + .isin(pd_df["int_col"]) .rolling(window="3s") - .min() + .count() ) pd.testing.assert_series_equal( actual_result, expected_result, check_dtype=False, check_index=False From 9a835cb0663ee7bc3fc60305d6710a5db3536c21 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Mon, 7 Apr 2025 23:07:13 +0000 Subject: [PATCH 11/15] fix type hint --- bigframes/core/window/ordering.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bigframes/core/window/ordering.py b/bigframes/core/window/ordering.py index dadb4ee9b9..5942802c27 100644 --- a/bigframes/core/window/ordering.py +++ b/bigframes/core/window/ordering.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + from functools import singledispatch from bigframes.core import expression as ex From ea592174494deb53e961f9348ae22914499f5b12 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Mon, 7 Apr 2025 23:14:31 +0000 Subject: [PATCH 12/15] add comment on skipping range window during order pull up --- bigframes/core/nodes.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index ad83a65de5..6730e20a7f 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -1415,6 +1415,8 @@ def inherits_order(self) -> bool: op_inherits_order = ( not self.expression.op.order_independent ) and self.expression.op.implicitly_inherits_order + # range-bounded windows do not inherit orders because their ordering are + # already defined before rewrite time. return op_inherits_order or self.window_spec.is_row_bounded @property From 0bdaf177d3bb0abda5731ef13b922485fe71511e Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Mon, 7 Apr 2025 23:17:30 +0000 Subject: [PATCH 13/15] fix lint --- bigframes/core/nodes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 6730e20a7f..a3f3613e4e 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -1415,7 +1415,7 @@ def inherits_order(self) -> bool: op_inherits_order = ( not self.expression.op.order_independent ) and self.expression.op.implicitly_inherits_order - # range-bounded windows do not inherit orders because their ordering are + # range-bounded windows do not inherit orders because their ordering are # already defined before rewrite time. return op_inherits_order or self.window_spec.is_row_bounded From cd6a6667211a11493fadabd1f2d48abf92f623ca Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Tue, 8 Apr 2025 00:40:49 +0000 Subject: [PATCH 14/15] check order for all AdditiveNode --- bigframes/core/window/ordering.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/bigframes/core/window/ordering.py b/bigframes/core/window/ordering.py index 5942802c27..26878e7f07 100644 --- a/bigframes/core/window/ordering.py +++ b/bigframes/core/window/ordering.py @@ -67,10 +67,5 @@ def _(root: nodes.FilterNode, column_id: str): @find_order_direction.register -def _(root: nodes.InNode, column_id: str): - return find_order_direction(root.left_child, column_id) - - -@find_order_direction.register -def _(root: nodes.WindowOpNode, column_id: str): - return find_order_direction(root.child, column_id) +def _(root: nodes.AdditiveNode, column_id: str): + return find_order_direction(root.additive_base, column_id) From 4b4c379545ae5e7992ff1d3cd5250d247e89d0d5 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Tue, 8 Apr 2025 00:53:56 +0000 Subject: [PATCH 15/15] explicitly dispatch additive nodes --- bigframes/core/window/ordering.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/bigframes/core/window/ordering.py b/bigframes/core/window/ordering.py index 26878e7f07..6ab66cf4d8 100644 --- a/bigframes/core/window/ordering.py +++ b/bigframes/core/window/ordering.py @@ -67,5 +67,15 @@ def _(root: nodes.FilterNode, column_id: str): @find_order_direction.register -def _(root: nodes.AdditiveNode, column_id: str): - return find_order_direction(root.additive_base, column_id) +def _(root: nodes.InNode, column_id: str): + return find_order_direction(root.left_child, column_id) + + +@find_order_direction.register +def _(root: nodes.WindowOpNode, column_id: str): + return find_order_direction(root.child, column_id) + + +@find_order_direction.register +def _(root: nodes.ProjectionNode, column_id: str): + return find_order_direction(root.child, column_id)