From b3771b81de8322e2034af28326e392b15815d5a2 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 28 May 2024 21:53:54 +0000 Subject: [PATCH 01/14] feat: Add Series.peek to preview data efficiently --- bigframes/core/blocks.py | 6 +-- bigframes/core/pruning.py | 64 +++++++++++++++++++++++++++++++ bigframes/core/tree_properties.py | 26 ++++++++++++- bigframes/operations/__init__.py | 5 +++ bigframes/series.py | 42 ++++++++++++++++++-- bigframes/session/__init__.py | 21 ++++++++-- bigframes/session/planner.py | 62 ++++++++++++++++++++++++++++++ tests/system/small/test_series.py | 36 +++++++++++++++++ tests/unit/test_planner.py | 57 +++++++++++++++++++++++++++ 9 files changed, 308 insertions(+), 11 deletions(-) create mode 100644 bigframes/core/pruning.py create mode 100644 bigframes/session/planner.py create mode 100644 tests/unit/test_planner.py diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 010eb96f75..e5620b6bb0 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2054,13 +2054,13 @@ def to_sql_query( idx_labels, ) - def cached(self, *, optimize_offsets=False, force: bool = False) -> None: + def cached(self, *, force: bool = False, session_aware: bool = False) -> None: """Write the block to a session table.""" # use a heuristic for whether something needs to be cached if (not force) and self.session._is_trivially_executable(self.expr): return - if optimize_offsets: - self.session._cache_with_offsets(self.expr) + elif session_aware: + self.session._session_aware_caching(self.expr) else: self.session._cache_with_cluster_cols( self.expr, cluster_cols=self.index_columns diff --git a/bigframes/core/pruning.py b/bigframes/core/pruning.py new file mode 100644 index 0000000000..25a6780090 --- /dev/null +++ b/bigframes/core/pruning.py @@ -0,0 +1,64 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Sequence + +import bigframes.core.expression as ex +import bigframes.operations as ops + +COMPARISON_OP_TYPES = tuple( + type(i) + for i in ( + ops.eq_op, + ops.eq_null_match_op, + ops.ne_op, + ops.gt_op, + ops.ge_op, + ops.lt_op, + ops.le_op, + ) +) + + +def cluster_cols_for_predicate(predicate: ex.Expression) -> Sequence[str]: + """Try to determine cluster col candidates that work with given predicates.""" + if isinstance(predicate, ex.UnboundVariableExpression): + return [predicate.id] + if isinstance(predicate, ex.OpExpression): + op = predicate.op + if isinstance(op, COMPARISON_OP_TYPES): + return cluster_cols_for_comparison(predicate.inputs[0], predicate.inputs[1]) + if isinstance(op, (type(ops.invert_op))): + return cluster_cols_for_predicate(predicate.inputs[0]) + if isinstance(op, (type(ops.and_op), type(ops.or_op))): + left_cols = cluster_cols_for_predicate(predicate.inputs[0]) + right_cols = cluster_cols_for_predicate(predicate.inputs[1]) + return [*left_cols, *[col for col in right_cols if col not in left_cols]] + else: + return [] + else: + # Constant + return [] + + +def cluster_cols_for_comparison( + left_ex: ex.Expression, right_ex: ex.Expression +) -> Sequence[str]: + if left_ex.is_const: + if isinstance(right_ex, ex.UnboundVariableExpression): + return [right_ex.id] + elif right_ex.is_const: + if isinstance(left_ex, ex.UnboundVariableExpression): + return [left_ex.id] + return [] diff --git a/bigframes/core/tree_properties.py b/bigframes/core/tree_properties.py index 2847a8f7f1..7e63382203 100644 --- a/bigframes/core/tree_properties.py +++ b/bigframes/core/tree_properties.py @@ -15,7 +15,7 @@ import functools import itertools -from typing import Callable, Dict, Optional +from typing import Callable, Dict, Optional, Sequence import bigframes.core.nodes as nodes @@ -91,6 +91,30 @@ def _node_counts_inner( ) +def count_nodes(forest: Sequence[nodes.BigFrameNode]) -> dict[nodes.BigFrameNode, int]: + def _combine_counts( + left: Dict[nodes.BigFrameNode, int], right: Dict[nodes.BigFrameNode, int] + ) -> Dict[nodes.BigFrameNode, int]: + return { + key: left.get(key, 0) + right.get(key, 0) + for key in itertools.chain(left.keys(), right.keys()) + } + + empty_counts: Dict[nodes.BigFrameNode, int] = {} + + @functools.cache + def _node_counts_inner( + subtree: nodes.BigFrameNode, + ) -> Dict[nodes.BigFrameNode, int]: + """Helper function to count occurences of duplicate nodes in a subtree. Considers only nodes in a complexity range""" + child_counts = [_node_counts_inner(child) for child in subtree.child_nodes] + node_counts = functools.reduce(_combine_counts, child_counts, empty_counts) + return _combine_counts(node_counts, {subtree: 1}) + + counts = [_node_counts_inner(root) for root in forest] + return functools.reduce(_combine_counts, counts, empty_counts) + + def replace_nodes( root: nodes.BigFrameNode, replacements: dict[nodes.BigFrameNode, nodes.BigFrameNode], diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index fe9fe6df20..c55580724a 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -60,6 +60,11 @@ def order_preserving(self) -> bool: """Whether the row operation preserves total ordering. Can be pruned from ordering expressions.""" return False + @property + def pruning_compatible(self) -> bool: + """Whether the operation preserves locality o""" + return False + @dataclasses.dataclass(frozen=True) class NaryOp(ScalarOp): diff --git a/bigframes/series.py b/bigframes/series.py index 3f1fa4c3a5..36ba807aa1 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -617,6 +617,39 @@ def head(self, n: int = 5) -> Series: def tail(self, n: int = 5) -> Series: return typing.cast(Series, self.iloc[-n:]) + def peek(self, n: int = 5, *, force: bool = True) -> pandas.DataFrame: + """ + Preview n arbitrary elements from the series. No guarantees about row selection or ordering. + ``Series.peek(force=False)`` will always be very fast, but will not succeed if data requires + full data scanning. Using ``force=True`` will always succeed, but may be perform queries. + Query results will be cached so that future steps will benefit from these queries. + + Args: + n (int, default 5): + The number of rows to select from the series. Which N rows are returned is non-deterministic. + force (bool, default True): + If the data cannot be peeked efficiently, the series will instead be fully materialized as part + of the operation if ``force=True``. If ``force=False``, the operation will throw a ValueError. + Returns: + pandas.Series: A pandas Series with n rows. + + Raises: + ValueError: If force=False and data cannot be efficiently peeked. + """ + maybe_result = self._block.try_peek(n) + if maybe_result is None: + if force: + self._cached() + maybe_result = self._block.try_peek(n, force=True) + assert maybe_result is not None + else: + raise ValueError( + "Cannot peek efficiently when data has aggregates, joins or window functions applied. Use force=True to fully compute dataframe." + ) + as_series = maybe_result.squeeze(axis=1) + as_series.name = self.name + return as_series + def nlargest(self, n: int = 5, keep: str = "first") -> Series: if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") @@ -1400,7 +1433,7 @@ def apply( # return Series with materialized result so that any error in the remote # function is caught early - materialized_series = result_series._cached() + materialized_series = result_series._cached(session_aware=False) return materialized_series def combine( @@ -1775,10 +1808,11 @@ def cache(self): Returns: Series: Self """ - return self._cached(force=True) + # Do not use session-aware cashing if user-requested + return self._cached(force=True, session_aware=False) - def _cached(self, *, force: bool = True) -> Series: - self._block.cached(force=force) + def _cached(self, *, force: bool = True, session_aware: bool = True) -> Series: + self._block.cached(force=force, session_aware=session_aware) return self def _optimize_query_complexity(self): diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 83481a3ae9..15d4175270 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -16,7 +16,6 @@ from __future__ import annotations -import collections.abc import copy import datetime import logging @@ -85,6 +84,7 @@ import bigframes.core.nodes as nodes from bigframes.core.ordering import IntegerEncoding import bigframes.core.ordering as order +import bigframes.core.pruning import bigframes.core.tree_properties as traversals import bigframes.core.tree_properties as tree_properties import bigframes.core.utils as utils @@ -100,6 +100,7 @@ import bigframes.session._io.bigquery as bf_io_bigquery import bigframes.session._io.bigquery.read_gbq_table as bf_read_gbq_table import bigframes.session.clients +import bigframes.session.planner import bigframes.version # Avoid circular imports. @@ -326,13 +327,15 @@ def session_id(self): @property def objects( self, - ) -> collections.abc.Set[ + ) -> Tuple[ Union[ bigframes.core.indexes.Index, bigframes.series.Series, dataframe.DataFrame ] ]: + still_alive = [i for i in self._objects if i() is not None] + self._objects = still_alive # Create a set with strong references, be careful not to hold onto this needlessly, as will prevent garbage collection. - return set(i() for i in self._objects if i() is not None) # type: ignore + return tuple(i() for i in self._objects if i() is not None) # type: ignore @property def _project(self): @@ -1913,6 +1916,18 @@ def _cache_with_offsets(self, array_value: core.ArrayValue): ).node self._cached_executions[array_value.node] = cached_replacement + def _session_aware_caching(self, array_value: core.ArrayValue) -> None: + # this is the occurence count across the whole session + forest = [obj._block.expr.node for obj in self.objects] + # These node types are cheap to re-compute + target, cluster_col = bigframes.session.planner.session_aware_cache_plan( + array_value.node, forest + ) + if cluster_col: + self._cache_with_cluster_cols(core.ArrayValue(target), [cluster_col]) + else: + self._cache_with_offsets(core.ArrayValue(target)) + def _simplify_with_caching(self, array_value: core.ArrayValue): """Attempts to handle the complexity by caching duplicated subtrees and breaking the query into pieces.""" # Apply existing caching first diff --git a/bigframes/session/planner.py b/bigframes/session/planner.py new file mode 100644 index 0000000000..9f266e3f1f --- /dev/null +++ b/bigframes/session/planner.py @@ -0,0 +1,62 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from typing import Optional, Sequence, Tuple + +import bigframes.core.expression as ex +import bigframes.core.nodes as nodes +import bigframes.core.pruning as predicate_pruning +import bigframes.core.tree_properties as traversals + + +def session_aware_cache_plan( + root: nodes.BigFrameNode, session_forest: Sequence[nodes.BigFrameNode] +) -> Tuple[nodes.BigFrameNode, Optional[str]]: + """ + Determines the best node to cache given a target and a list of object roots for objects in a session. + + Returns the node to cache, and optionally a clustering column. + """ + node_counts = traversals.count_nodes(session_forest) + # These node types are cheap to re-compute + de_cachable_types = (nodes.FilterNode, nodes.ProjectionNode) + caching_target = cur_node = root + caching_target_refs = node_counts.get(caching_target, 0) + + filters: list[ + ex.Expression + ] = [] # accumulate filters into this as traverse downwards + cluster_col: Optional[str] = None + while isinstance(cur_node, de_cachable_types): + if isinstance(cur_node, nodes.FilterNode): + filters.append(cur_node.predicate) + elif isinstance(cur_node, nodes.ProjectionNode): + bindings = {name: expr for expr, name in cur_node.assignments} + filters = [i.bind_all_variables(bindings) for i in filters] + + cur_node = cur_node.child + cur_node_refs = node_counts.get(cur_node, 0) + if cur_node_refs > caching_target_refs: + caching_target, caching_target_refs = cur_node, cur_node_refs + cluster_col = None + # Just pick the first cluster-compatible predicate + for predicate in filters: + # Cluster cols only consider the target object and not other sesssion objects + cluster_cols = predicate_pruning.cluster_cols_for_predicate(predicate) + if len(cluster_cols) > 0: + cluster_col = cluster_cols[0] + continue + return caching_target, cluster_col diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index dbc8ddec6f..85892e5a4d 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -1936,6 +1936,42 @@ def test_head_then_series_operation(scalars_dfs): ) +def test_series_peek(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + peek_result = scalars_df["float64_col"].peek(n=3, force=False) + pd.testing.assert_series_equal( + peek_result, + scalars_pandas_df["float64_col"].reindex_like(peek_result), + ) + + +def test_series_peek_filtered(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + peek_result = scalars_df[scalars_df.int64_col > 0]["float64_col"].peek( + n=3, force=False + ) + pd_result = scalars_pandas_df[scalars_pandas_df.int64_col > 0]["float64_col"] + pd.testing.assert_series_equal( + peek_result, + pd_result.reindex_like(peek_result), + ) + + +@skip_legacy_pandas +def test_series_peek_force(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + + cumsum_df = scalars_df[["int64_col", "int64_too"]].cumsum() + df_filtered = cumsum_df[cumsum_df.int64_col > 0]["int64_too"] + peek_result = df_filtered.peek(n=3, force=True) + pd_cumsum_df = scalars_pandas_df[["int64_col", "int64_too"]].cumsum() + pd_result = pd_cumsum_df[pd_cumsum_df.int64_col > 0]["int64_too"] + pd.testing.assert_series_equal( + peek_result, + pd_result.reindex_like(peek_result), + ) + + def test_shift(scalars_df_index, scalars_pandas_df_index): col_name = "int64_col" bf_result = scalars_df_index[col_name].shift().to_pandas() diff --git a/tests/unit/test_planner.py b/tests/unit/test_planner.py new file mode 100644 index 0000000000..acaa626e17 --- /dev/null +++ b/tests/unit/test_planner.py @@ -0,0 +1,57 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import ibis +import pandas as pd + +import bigframes.core as core +import bigframes.core.expression as ex +import bigframes.core.ordering as bf_order +import bigframes.operations as ops +import bigframes.session.planner as planner + +ibis_table = ibis.memtable({"col_a": [1, 2, 3], "col_b": [4, 5, 6]}) +cols = [ibis_table["col_a"], ibis_table["col_b"]] +LEAF: core.ArrayValue = core.ArrayValue.from_ibis( + session=None, # type: ignore + table=ibis_table, + columns=cols, + hidden_ordering_columns=[], + ordering=bf_order.ExpressionOrdering((bf_order.ascending_over("col_a"),)), +) + + +def test_session_aware_caching_project_filter(): + session_objects = [LEAF, LEAF.assign_constant("col_c", 4, pd.Int64Dtype())] + target = LEAF.assign_constant("col_c", 4, pd.Int64Dtype()).filter( + ops.gt_op.as_expr("col_a", ex.const(3)) + ) + result, cluster_col = planner.session_aware_cache_plan( + target.node, [obj.node for obj in session_objects] + ) + assert result == LEAF.node + assert cluster_col == "col_a" + + +def test_session_aware_caching_unusable_filter(): + session_objects = [LEAF, LEAF.assign_constant("col_c", 4, pd.Int64Dtype())] + target = LEAF.assign_constant("col_c", 4, pd.Int64Dtype()).filter( + ops.gt_op.as_expr("col_a", "col_b") + ) + result, cluster_col = planner.session_aware_cache_plan( + target.node, [obj.node for obj in session_objects] + ) + assert result == LEAF.node + assert cluster_col is None From f227476f4a93c0c61e3de5760b8fe87de7cbc4c8 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 30 May 2024 17:20:07 +0000 Subject: [PATCH 02/14] add another test --- bigframes/core/pruning.py | 2 ++ tests/unit/test_planner.py | 21 +++++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/bigframes/core/pruning.py b/bigframes/core/pruning.py index 25a6780090..5342f0662f 100644 --- a/bigframes/core/pruning.py +++ b/bigframes/core/pruning.py @@ -33,6 +33,7 @@ def cluster_cols_for_predicate(predicate: ex.Expression) -> Sequence[str]: """Try to determine cluster col candidates that work with given predicates.""" + # TODO: Prioritize equality predicates over ranges if isinstance(predicate, ex.UnboundVariableExpression): return [predicate.id] if isinstance(predicate, ex.OpExpression): @@ -56,6 +57,7 @@ def cluster_cols_for_comparison( left_ex: ex.Expression, right_ex: ex.Expression ) -> Sequence[str]: if left_ex.is_const: + # There are some invertible ops that would also be ok if isinstance(right_ex, ex.UnboundVariableExpression): return [right_ex.id] elif right_ex.is_const: diff --git a/tests/unit/test_planner.py b/tests/unit/test_planner.py index acaa626e17..5ef52deb22 100644 --- a/tests/unit/test_planner.py +++ b/tests/unit/test_planner.py @@ -55,3 +55,24 @@ def test_session_aware_caching_unusable_filter(): ) assert result == LEAF.node assert cluster_col is None + + +def test_session_aware_caching_fork_after_window_op(): + other = LEAF.promote_offsets("offsets_col").assign_constant( + "col_d", 5, pd.Int64Dtype() + ) + target = ( + LEAF.promote_offsets("offsets_col") + .assign_constant("col_c", 4, pd.Int64Dtype()) + .filter( + ops.eq_op.as_expr("col_a", ops.add_op.as_expr(ex.const(4), ex.const(3))) + ) + ) + result, cluster_col = planner.session_aware_cache_plan( + target.node, + [ + other.node, + ], + ) + assert result == LEAF.promote_offsets("offsets_col").node + assert cluster_col == "col_a" From a17e02748fd4f3abc2100b20faff8aa3ca8c6e00 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 31 May 2024 19:49:34 +0000 Subject: [PATCH 03/14] cleanup comments --- bigframes/core/pruning.py | 4 +++- bigframes/core/tree_properties.py | 13 +++++++++++++ bigframes/operations/__init__.py | 5 ----- bigframes/series.py | 3 ++- bigframes/session/__init__.py | 2 +- 5 files changed, 19 insertions(+), 8 deletions(-) diff --git a/bigframes/core/pruning.py b/bigframes/core/pruning.py index 5342f0662f..90a78a8fb3 100644 --- a/bigframes/core/pruning.py +++ b/bigframes/core/pruning.py @@ -33,7 +33,7 @@ def cluster_cols_for_predicate(predicate: ex.Expression) -> Sequence[str]: """Try to determine cluster col candidates that work with given predicates.""" - # TODO: Prioritize equality predicates over ranges + # TODO: Prioritize based on predicted selectivity (eg. equality conditions are probably very selective) if isinstance(predicate, ex.UnboundVariableExpression): return [predicate.id] if isinstance(predicate, ex.OpExpression): @@ -56,6 +56,8 @@ def cluster_cols_for_predicate(predicate: ex.Expression) -> Sequence[str]: def cluster_cols_for_comparison( left_ex: ex.Expression, right_ex: ex.Expression ) -> Sequence[str]: + # TODO: Try to normalize expressions such that one side is a single variable. + # eg. Convert -cola>=3 to cola<-3 and colb+3 < 4 to colb < 1 if left_ex.is_const: # There are some invertible ops that would also be ok if isinstance(right_ex, ex.UnboundVariableExpression): diff --git a/bigframes/core/tree_properties.py b/bigframes/core/tree_properties.py index 7e63382203..846cf50d77 100644 --- a/bigframes/core/tree_properties.py +++ b/bigframes/core/tree_properties.py @@ -92,6 +92,19 @@ def _node_counts_inner( def count_nodes(forest: Sequence[nodes.BigFrameNode]) -> dict[nodes.BigFrameNode, int]: + """ + Counts the number of instances of each subtree present within a forest. + + Memoizes internally to accelerate execution, but cache not persisted (not reused between invocations). + + Args: + forest (Sequence of BigFrameNode): + The roots of each tree in the forest + + Returns: + dict[BigFramesNode, int]: The number of occurences of each subtree. + """ + def _combine_counts( left: Dict[nodes.BigFrameNode, int], right: Dict[nodes.BigFrameNode, int] ) -> Dict[nodes.BigFrameNode, int]: diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index 4b1dfeaa3c..42f83913ee 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -60,11 +60,6 @@ def order_preserving(self) -> bool: """Whether the row operation preserves total ordering. Can be pruned from ordering expressions.""" return False - @property - def pruning_compatible(self) -> bool: - """Whether the operation preserves locality o""" - return False - @dataclasses.dataclass(frozen=True) class NaryOp(ScalarOp): diff --git a/bigframes/series.py b/bigframes/series.py index 56591ef34d..069c9de360 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -619,7 +619,8 @@ def tail(self, n: int = 5) -> Series: def peek(self, n: int = 5, *, force: bool = True) -> pandas.DataFrame: """ - Preview n arbitrary elements from the series. No guarantees about row selection or ordering. + Preview n arbitrary elements from the series without guarantees about row selection or ordering. + ``Series.peek(force=False)`` will always be very fast, but will not succeed if data requires full data scanning. Using ``force=True`` will always succeed, but may be perform queries. Query results will be cached so that future steps will benefit from these queries. diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index f13fc7ef60..30d6b20b14 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -333,7 +333,7 @@ def session_id(self): @property def objects( self, - ) -> Tuple[ + ) -> Iterable[ Union[ bigframes.core.indexes.Index, bigframes.series.Series, dataframe.DataFrame ] From 5ff4661d1692138145c3ab5bc2ed36b4d0866dde Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 4 Jun 2024 19:46:23 +0000 Subject: [PATCH 04/14] more comments, up to 4 cluster cols for session-based caching --- bigframes/core/blocks.py | 2 +- bigframes/core/pruning.py | 37 +++++++++++++++++------------ bigframes/dtypes.py | 10 ++++++++ bigframes/session/__init__.py | 8 +++---- bigframes/session/planner.py | 39 ++++++++++++++++++++----------- tests/system/small/test_series.py | 13 +++++++++++ tests/unit/test_planner.py | 25 +++++++++++++++----- 7 files changed, 95 insertions(+), 39 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 2c730a8053..c64239d0d3 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2240,7 +2240,7 @@ def cached(self, *, force: bool = False, session_aware: bool = False) -> None: if (not force) and self.session._is_trivially_executable(self.expr): return elif session_aware: - self.session._session_aware_caching(self.expr) + self.session._cache_with_session_awareness(self.expr) else: self.session._cache_with_cluster_cols( self.expr, cluster_cols=self.index_columns diff --git a/bigframes/core/pruning.py b/bigframes/core/pruning.py index 90a78a8fb3..4dc8620c94 100644 --- a/bigframes/core/pruning.py +++ b/bigframes/core/pruning.py @@ -12,11 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Sequence - import bigframes.core.expression as ex +import bigframes.core.schema as schemata +import bigframes.dtypes import bigframes.operations as ops +LOW_CARDINALITY_TYPES = [bigframes.dtypes.BOOL_DTYPE] + COMPARISON_OP_TYPES = tuple( type(i) for i in ( @@ -31,31 +33,36 @@ ) -def cluster_cols_for_predicate(predicate: ex.Expression) -> Sequence[str]: +def cluster_cols_for_predicate( + predicate: ex.Expression, schema: schemata.ArraySchema +) -> list[str]: """Try to determine cluster col candidates that work with given predicates.""" # TODO: Prioritize based on predicted selectivity (eg. equality conditions are probably very selective) if isinstance(predicate, ex.UnboundVariableExpression): - return [predicate.id] - if isinstance(predicate, ex.OpExpression): + cols = [predicate.id] + elif isinstance(predicate, ex.OpExpression): op = predicate.op if isinstance(op, COMPARISON_OP_TYPES): - return cluster_cols_for_comparison(predicate.inputs[0], predicate.inputs[1]) - if isinstance(op, (type(ops.invert_op))): - return cluster_cols_for_predicate(predicate.inputs[0]) - if isinstance(op, (type(ops.and_op), type(ops.or_op))): - left_cols = cluster_cols_for_predicate(predicate.inputs[0]) - right_cols = cluster_cols_for_predicate(predicate.inputs[1]) - return [*left_cols, *[col for col in right_cols if col not in left_cols]] + cols = cluster_cols_for_comparison(predicate.inputs[0], predicate.inputs[1]) + elif isinstance(op, (type(ops.invert_op))): + cols = cluster_cols_for_predicate(predicate.inputs[0], schema) + elif isinstance(op, (type(ops.and_op), type(ops.or_op))): + left_cols = cluster_cols_for_predicate(predicate.inputs[0], schema) + right_cols = cluster_cols_for_predicate(predicate.inputs[1], schema) + cols = [*left_cols, *[col for col in right_cols if col not in left_cols]] else: - return [] + cols = [] else: # Constant - return [] + cols = [] + return [ + col for col in cols if bigframes.dtypes.is_clusterable(schema.get_type(col)) + ] def cluster_cols_for_comparison( left_ex: ex.Expression, right_ex: ex.Expression -) -> Sequence[str]: +) -> list[str]: # TODO: Try to normalize expressions such that one side is a single variable. # eg. Convert -cola>=3 to cola<-3 and colb+3 < 4 to colb < 1 if left_ex.is_const: diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index 3df67ed9e4..61237c8788 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -165,6 +165,16 @@ def is_orderable(type: ExpressionType) -> bool: return not is_array_like(type) and not is_struct_like(type) and (type != GEO_DTYPE) +def is_clusterable(type: ExpressionType) -> bool: + # https://cloud.google.com/bigquery/docs/clustered-tables#cluster_column_types + # This is based on default database type mapping, could in theory represent in non-default bq type to cluster. + return ( + not is_array_like(type) + and not is_struct_like(type) + and (type not in (GEO_DTYPE, TIME_DTYPE, FLOAT_DTYPE)) + ) + + def is_bool_coercable(type: ExpressionType) -> bool: # TODO: Implement more bool coercions return (type is None) or is_numeric(type) or is_string_like(type) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 1d3290b380..7790597cca 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1921,15 +1921,15 @@ def _cache_with_offsets(self, array_value: core.ArrayValue): ).node self._cached_executions[array_value.node] = cached_replacement - def _session_aware_caching(self, array_value: core.ArrayValue) -> None: + def _cache_with_session_awareness(self, array_value: core.ArrayValue) -> None: # this is the occurence count across the whole session forest = [obj._block.expr.node for obj in self.objects] # These node types are cheap to re-compute - target, cluster_col = bigframes.session.planner.session_aware_cache_plan( + target, cluster_cols = bigframes.session.planner.session_aware_cache_plan( array_value.node, forest ) - if cluster_col: - self._cache_with_cluster_cols(core.ArrayValue(target), [cluster_col]) + if len(cluster_cols) > 0: + self._cache_with_cluster_cols(core.ArrayValue(target), cluster_cols) else: self._cache_with_offsets(core.ArrayValue(target)) diff --git a/bigframes/session/planner.py b/bigframes/session/planner.py index 9f266e3f1f..fc52ee236f 100644 --- a/bigframes/session/planner.py +++ b/bigframes/session/planner.py @@ -14,7 +14,8 @@ from __future__ import annotations -from typing import Optional, Sequence, Tuple +import itertools +from typing import Sequence, Tuple import bigframes.core.expression as ex import bigframes.core.nodes as nodes @@ -24,14 +25,14 @@ def session_aware_cache_plan( root: nodes.BigFrameNode, session_forest: Sequence[nodes.BigFrameNode] -) -> Tuple[nodes.BigFrameNode, Optional[str]]: +) -> Tuple[nodes.BigFrameNode, list[str]]: """ Determines the best node to cache given a target and a list of object roots for objects in a session. Returns the node to cache, and optionally a clustering column. """ node_counts = traversals.count_nodes(session_forest) - # These node types are cheap to re-compute + # These node types are cheap to re-compute, so it makes more sense to cache their children. de_cachable_types = (nodes.FilterNode, nodes.ProjectionNode) caching_target = cur_node = root caching_target_refs = node_counts.get(caching_target, 0) @@ -39,24 +40,36 @@ def session_aware_cache_plan( filters: list[ ex.Expression ] = [] # accumulate filters into this as traverse downwards - cluster_col: Optional[str] = None + clusterable_cols: set[str] = set() while isinstance(cur_node, de_cachable_types): if isinstance(cur_node, nodes.FilterNode): + # Filter node doesn't define any variables, so no need to chain expressions filters.append(cur_node.predicate) elif isinstance(cur_node, nodes.ProjectionNode): + # Projection defines the variables that are used in the filter expressions, need to substitute variables with their scalar expressions + # that instead reference variables in the child node. bindings = {name: expr for expr, name in cur_node.assignments} filters = [i.bind_all_variables(bindings) for i in filters] + else: + raise ValueError(f"Unexpected de-cached node: {cur_node}") cur_node = cur_node.child cur_node_refs = node_counts.get(cur_node, 0) if cur_node_refs > caching_target_refs: caching_target, caching_target_refs = cur_node, cur_node_refs - cluster_col = None - # Just pick the first cluster-compatible predicate - for predicate in filters: - # Cluster cols only consider the target object and not other sesssion objects - cluster_cols = predicate_pruning.cluster_cols_for_predicate(predicate) - if len(cluster_cols) > 0: - cluster_col = cluster_cols[0] - continue - return caching_target, cluster_col + schema = cur_node.schema + # Cluster cols only consider the target object and not other sesssion objects + # Note, this + clusterable_cols = set( + itertools.chain.from_iterable( + map( + lambda f: predicate_pruning.cluster_cols_for_predicate( + f, schema + ), + filters, + ) + ) + ) + # BQ supports up to 4 cluster columns, just prioritize by alphabetical ordering + # TODO: Prioritize caching columns by estimated filter selectivity + return caching_target, sorted(list(clusterable_cols))[:4] diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index a4c62c88ba..bf2d00578b 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -1979,6 +1979,19 @@ def test_series_peek(scalars_dfs): ) +def test_series_peek_multi_index(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + bf_series = scalars_df.set_index(["string_col", "bool_col"])["float64_col"] + bf_series.name = ("2-part", "name") + pd_series = scalars_pandas_df.set_index(["string_col", "bool_col"])["float64_col"] + pd_series.name = ("2-part", "name") + peek_result = bf_series.peek(n=3, force=False) + pd.testing.assert_series_equal( + peek_result, + pd_series.reindex_like(peek_result), + ) + + def test_series_peek_filtered(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs peek_result = scalars_df[scalars_df.int64_col > 0]["float64_col"].peek( diff --git a/tests/unit/test_planner.py b/tests/unit/test_planner.py index 5ef52deb22..7a71dc7377 100644 --- a/tests/unit/test_planner.py +++ b/tests/unit/test_planner.py @@ -34,30 +34,43 @@ def test_session_aware_caching_project_filter(): + """ + Test that if a node is filtered by a column, the node is cached pre-filter and clustered by the filter column. + """ session_objects = [LEAF, LEAF.assign_constant("col_c", 4, pd.Int64Dtype())] target = LEAF.assign_constant("col_c", 4, pd.Int64Dtype()).filter( ops.gt_op.as_expr("col_a", ex.const(3)) ) - result, cluster_col = planner.session_aware_cache_plan( + result, cluster_cols = planner.session_aware_cache_plan( target.node, [obj.node for obj in session_objects] ) assert result == LEAF.node - assert cluster_col == "col_a" + assert cluster_cols == ["col_a"] def test_session_aware_caching_unusable_filter(): + """ + Test that if a node is filtered by multiple columns in the same comparison, the node is cached pre-filter and not clustered by either column. + + Most filters with multiple column references cannot be used for scan pruning, as they cannot be converted to fixed value ranges. + """ session_objects = [LEAF, LEAF.assign_constant("col_c", 4, pd.Int64Dtype())] target = LEAF.assign_constant("col_c", 4, pd.Int64Dtype()).filter( ops.gt_op.as_expr("col_a", "col_b") ) - result, cluster_col = planner.session_aware_cache_plan( + result, cluster_cols = planner.session_aware_cache_plan( target.node, [obj.node for obj in session_objects] ) assert result == LEAF.node - assert cluster_col is None + assert cluster_cols == [] def test_session_aware_caching_fork_after_window_op(): + """ + Test that caching happens only after an windowed operation, but before filtering, projecting. + + Windowing is expensive, so caching should always compute the window function, in order to avoid later recomputation. + """ other = LEAF.promote_offsets("offsets_col").assign_constant( "col_d", 5, pd.Int64Dtype() ) @@ -68,11 +81,11 @@ def test_session_aware_caching_fork_after_window_op(): ops.eq_op.as_expr("col_a", ops.add_op.as_expr(ex.const(4), ex.const(3))) ) ) - result, cluster_col = planner.session_aware_cache_plan( + result, cluster_cols = planner.session_aware_cache_plan( target.node, [ other.node, ], ) assert result == LEAF.promote_offsets("offsets_col").node - assert cluster_col == "col_a" + assert cluster_cols == ["col_a"] From 936e73d01d74a2b8f465b0d3c4ec6fef984be933 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 4 Jun 2024 19:51:31 +0000 Subject: [PATCH 05/14] add another session caching test --- tests/unit/test_planner.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/unit/test_planner.py b/tests/unit/test_planner.py index 7a71dc7377..1f91218054 100644 --- a/tests/unit/test_planner.py +++ b/tests/unit/test_planner.py @@ -48,6 +48,27 @@ def test_session_aware_caching_project_filter(): assert cluster_cols == ["col_a"] +def test_session_aware_caching_project_multi_filter(): + """ + Test that if a node is filtered by multiple columns, all of them are in the cluster cols + """ + session_objects = [LEAF, LEAF.assign_constant("col_c", 4, pd.Int64Dtype())] + predicate_1a = ops.gt_op.as_expr("col_a", ex.const(3)) + predicate_1b = ops.lt_op.as_expr("col_a", ex.const(55)) + predicate_1 = ops.and_op.as_expr(predicate_1a, predicate_1b) + predicate_3 = ops.eq_op.as_expr("col_b", ex.const(1)) + target = ( + LEAF.filter(predicate_1) + .assign_constant("col_c", 4, pd.Int64Dtype()) + .filter(predicate_3) + ) + result, cluster_cols = planner.session_aware_cache_plan( + target.node, [obj.node for obj in session_objects] + ) + assert result == LEAF.node + assert cluster_cols == ["col_a", "col_b"] + + def test_session_aware_caching_unusable_filter(): """ Test that if a node is filtered by multiple columns in the same comparison, the node is cached pre-filter and not clustered by either column. From a9b16c411c0bce2d799f6d1dd16cfc2a39df91ad Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 5 Jun 2024 00:17:28 +0000 Subject: [PATCH 06/14] add todo for geo predicate detection --- bigframes/core/pruning.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bigframes/core/pruning.py b/bigframes/core/pruning.py index 4dc8620c94..55165a616c 100644 --- a/bigframes/core/pruning.py +++ b/bigframes/core/pruning.py @@ -42,6 +42,8 @@ def cluster_cols_for_predicate( cols = [predicate.id] elif isinstance(predicate, ex.OpExpression): op = predicate.op + # TODO: Support geo predicates, which support pruning if clustered (other than st_disjoint) + # https://cloud.google.com/bigquery/docs/reference/standard-sql/geography_functions if isinstance(op, COMPARISON_OP_TYPES): cols = cluster_cols_for_comparison(predicate.inputs[0], predicate.inputs[1]) elif isinstance(op, (type(ops.invert_op))): From ec1d9733a9ff9cdb62d57184ea371a6354903fa8 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 12 Jun 2024 22:41:07 +0000 Subject: [PATCH 07/14] add dtype clusterable and orderable property --- bigframes/dtypes.py | 73 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 59 insertions(+), 14 deletions(-) diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index a957154d6a..506c193816 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -74,52 +74,95 @@ class SimpleDtypeInfo: logical_bytes: int = ( 8 # this is approximate only, some types are variably sized, also, compression ) + orderable: bool = False + clusterable: bool = False # TODO: Missing BQ types: INTERVAL, JSON, RANGE # TODO: Add mappings to python types SIMPLE_TYPES = ( SimpleDtypeInfo( - dtype=INT_DTYPE, arrow_dtype=pa.int64(), type_kind=("INT64", "INTEGER") + dtype=INT_DTYPE, + arrow_dtype=pa.int64(), + type_kind=("INT64", "INTEGER"), + orderable=True, + clusterable=True, ), SimpleDtypeInfo( - dtype=FLOAT_DTYPE, arrow_dtype=pa.float64(), type_kind=("FLOAT64", "FLOAT") + dtype=FLOAT_DTYPE, + arrow_dtype=pa.float64(), + type_kind=("FLOAT64", "FLOAT"), + orderable=True, ), SimpleDtypeInfo( dtype=BOOL_DTYPE, arrow_dtype=pa.bool_(), type_kind=("BOOL", "BOOLEAN"), logical_bytes=1, + orderable=True, + clusterable=True, ), - SimpleDtypeInfo(dtype=STRING_DTYPE, arrow_dtype=pa.string(), type_kind=("STRING",)), SimpleDtypeInfo( - dtype=DATE_DTYPE, arrow_dtype=pa.date32(), type_kind=("DATE",), logical_bytes=4 + dtype=STRING_DTYPE, + arrow_dtype=pa.string(), + type_kind=("STRING",), + orderable=True, + clusterable=True, ), - SimpleDtypeInfo(dtype=TIME_DTYPE, arrow_dtype=pa.time64("us"), type_kind=("TIME",)), SimpleDtypeInfo( - dtype=DATETIME_DTYPE, arrow_dtype=pa.timestamp("us"), type_kind=("DATETIME",) + dtype=DATE_DTYPE, + arrow_dtype=pa.date32(), + type_kind=("DATE",), + logical_bytes=4, + orderable=True, + clusterable=True, + ), + SimpleDtypeInfo( + dtype=TIME_DTYPE, + arrow_dtype=pa.time64("us"), + type_kind=("TIME",), + orderable=True, + ), + SimpleDtypeInfo( + dtype=DATETIME_DTYPE, + arrow_dtype=pa.timestamp("us"), + type_kind=("DATETIME",), + orderable=True, + clusterable=True, ), SimpleDtypeInfo( dtype=TIMESTAMP_DTYPE, arrow_dtype=pa.timestamp("us", tz="UTC"), type_kind=("TIMESTAMP",), + orderable=True, + clusterable=True, + ), + SimpleDtypeInfo( + dtype=BYTES_DTYPE, arrow_dtype=pa.binary(), type_kind=("BYTES",), orderable=True ), - SimpleDtypeInfo(dtype=BYTES_DTYPE, arrow_dtype=pa.binary(), type_kind=("BYTES",)), SimpleDtypeInfo( dtype=NUMERIC_DTYPE, arrow_dtype=pa.decimal128(38, 9), type_kind=("NUMERIC",), logical_bytes=16, + orderable=True, + clusterable=True, ), SimpleDtypeInfo( dtype=BIGNUMERIC_DTYPE, arrow_dtype=pa.decimal256(76, 38), type_kind=("BIGNUMERIC",), logical_bytes=32, + orderable=True, + clusterable=True, ), # Geo has no corresponding arrow dtype SimpleDtypeInfo( - dtype=GEO_DTYPE, arrow_dtype=None, type_kind=("GEOGRAPHY",), logical_bytes=40 + dtype=GEO_DTYPE, + arrow_dtype=None, + type_kind=("GEOGRAPHY",), + logical_bytes=40, + clusterable=True, ), ) @@ -209,19 +252,21 @@ def is_comparable(type: ExpressionType) -> bool: return (type is not None) and is_orderable(type) +_ORDERABLE_SIMPLE_TYPES = set(mapping.dtype for mapping in SIMPLE_TYPES) + + def is_orderable(type: ExpressionType) -> bool: # On BQ side, ARRAY, STRUCT, GEOGRAPHY, JSON are not orderable - return not is_array_like(type) and not is_struct_like(type) and (type != GEO_DTYPE) + return type in _ORDERABLE_SIMPLE_TYPES + + +_CLUSTERABLE_SIMPLE_TYPES = set(mapping.dtype for mapping in SIMPLE_TYPES) def is_clusterable(type: ExpressionType) -> bool: # https://cloud.google.com/bigquery/docs/clustered-tables#cluster_column_types # This is based on default database type mapping, could in theory represent in non-default bq type to cluster. - return ( - not is_array_like(type) - and not is_struct_like(type) - and (type not in (GEO_DTYPE, TIME_DTYPE, FLOAT_DTYPE)) - ) + return type in _CLUSTERABLE_SIMPLE_TYPES def is_bool_coercable(type: ExpressionType) -> bool: From b917c71c43c2ffe2192b592244ad495c984f1cfe Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 13 Jun 2024 22:48:45 +0000 Subject: [PATCH 08/14] fix session aware caching unit tests --- tests/unit/test_planner.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/tests/unit/test_planner.py b/tests/unit/test_planner.py index 1f91218054..cd522fc1a4 100644 --- a/tests/unit/test_planner.py +++ b/tests/unit/test_planner.py @@ -13,23 +13,28 @@ # limitations under the License. from __future__ import annotations -import ibis +import google.cloud.bigquery import pandas as pd import bigframes.core as core import bigframes.core.expression as ex -import bigframes.core.ordering as bf_order +import bigframes.core.schema import bigframes.operations as ops import bigframes.session.planner as planner -ibis_table = ibis.memtable({"col_a": [1, 2, 3], "col_b": [4, 5, 6]}) -cols = [ibis_table["col_a"], ibis_table["col_b"]] -LEAF: core.ArrayValue = core.ArrayValue.from_ibis( +TABLE_REF = google.cloud.bigquery.TableReference.from_string("project.dataset.table") +SCHEMA = ( + google.cloud.bigquery.SchemaField("col_a", "INTEGER"), + google.cloud.bigquery.SchemaField("col_b", "INTEGER"), +) +TABLE = google.cloud.bigquery.Table( + table_ref=TABLE_REF, + schema=SCHEMA, +) +LEAF: core.ArrayValue = core.ArrayValue.from_table( session=None, # type: ignore - table=ibis_table, - columns=cols, - hidden_ordering_columns=[], - ordering=bf_order.ExpressionOrdering((bf_order.ascending_over("col_a"),)), + table=TABLE, + schema=bigframes.core.schema.ArraySchema.from_bq_table(TABLE), ) From 848d0a4c315b3df313f031391af93f650786686e Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 13 Jun 2024 22:58:30 +0000 Subject: [PATCH 09/14] mock session for planner test --- tests/unit/test_planner.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_planner.py b/tests/unit/test_planner.py index cd522fc1a4..2e276d0f1a 100644 --- a/tests/unit/test_planner.py +++ b/tests/unit/test_planner.py @@ -13,6 +13,8 @@ # limitations under the License. from __future__ import annotations +import unittest.mock as mock + import google.cloud.bigquery import pandas as pd @@ -31,8 +33,10 @@ table_ref=TABLE_REF, schema=SCHEMA, ) +FAKE_SESSION = mock.create_autospec(bigframes.Session, instance=True) +type(FAKE_SESSION)._strictly_ordered = mock.PropertyMock(return_value=True) LEAF: core.ArrayValue = core.ArrayValue.from_table( - session=None, # type: ignore + session=FAKE_SESSION, table=TABLE, schema=bigframes.core.schema.ArraySchema.from_bq_table(TABLE), ) From 79d05b5771c43e58f0adebde2c554af82193bbda Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 25 Jun 2024 17:43:30 +0000 Subject: [PATCH 10/14] fix offsets column name collision --- bigframes/session/__init__.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 7ca416cd33..25abe31801 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1874,15 +1874,16 @@ def _cache_with_offsets(self, array_value: core.ArrayValue): ) compiled_value = self._compile_ordered(array_value) + offset_col_name = bigframes.core.guid.generate_guid("bigframes_offsets") ibis_expr = compiled_value._to_ibis_expr( - ordering_mode="offset_col", order_col_name="bigframes_offsets" + ordering_mode="offset_col", order_col_name=offset_col_name ) tmp_table = self._ibis_to_temp_table( - ibis_expr, cluster_cols=["bigframes_offsets"], api_name="cached" + ibis_expr, cluster_cols=[offset_col_name], api_name="cached" ) cached_replacement = array_value.as_cached( cache_table=self.bqclient.get_table(tmp_table), - ordering=order.ExpressionOrdering.from_offset_col("bigframes_offsets"), + ordering=order.ExpressionOrdering.from_offset_col(offset_col_name), ).node self._cached_executions[array_value.node] = cached_replacement From 2ed1520899781f5acc0905867f7ad78854b36ea4 Mon Sep 17 00:00:00 2001 From: TrevorBergeron Date: Tue, 25 Jun 2024 15:18:06 -0700 Subject: [PATCH 11/14] Update bigframes/dtypes.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Tim Sweña (Swast) --- bigframes/dtypes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index 506c193816..47e7d77bcf 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -252,7 +252,7 @@ def is_comparable(type: ExpressionType) -> bool: return (type is not None) and is_orderable(type) -_ORDERABLE_SIMPLE_TYPES = set(mapping.dtype for mapping in SIMPLE_TYPES) +_ORDERABLE_SIMPLE_TYPES = set(mapping.dtype for mapping in SIMPLE_TYPES if mapping.orderable) def is_orderable(type: ExpressionType) -> bool: From 1ff4f681d8b1ce7698d8924e2b64c68621687af4 Mon Sep 17 00:00:00 2001 From: TrevorBergeron Date: Tue, 25 Jun 2024 15:18:16 -0700 Subject: [PATCH 12/14] Update bigframes/dtypes.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Tim Sweña (Swast) --- bigframes/dtypes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index 47e7d77bcf..dbce040aa6 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -260,7 +260,7 @@ def is_orderable(type: ExpressionType) -> bool: return type in _ORDERABLE_SIMPLE_TYPES -_CLUSTERABLE_SIMPLE_TYPES = set(mapping.dtype for mapping in SIMPLE_TYPES) +_CLUSTERABLE_SIMPLE_TYPES = set(mapping.dtype for mapping in SIMPLE_TYPES if mapping.clusterable) def is_clusterable(type: ExpressionType) -> bool: From 81e5a02af92bc0e2352b059300c7000ec63b3209 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 25 Jun 2024 22:23:59 +0000 Subject: [PATCH 13/14] add another series peek test --- bigframes/dtypes.py | 8 ++++++-- tests/system/small/test_series.py | 15 +++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index dbce040aa6..5de8f896a9 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -252,7 +252,9 @@ def is_comparable(type: ExpressionType) -> bool: return (type is not None) and is_orderable(type) -_ORDERABLE_SIMPLE_TYPES = set(mapping.dtype for mapping in SIMPLE_TYPES if mapping.orderable) +_ORDERABLE_SIMPLE_TYPES = set( + mapping.dtype for mapping in SIMPLE_TYPES if mapping.orderable +) def is_orderable(type: ExpressionType) -> bool: @@ -260,7 +262,9 @@ def is_orderable(type: ExpressionType) -> bool: return type in _ORDERABLE_SIMPLE_TYPES -_CLUSTERABLE_SIMPLE_TYPES = set(mapping.dtype for mapping in SIMPLE_TYPES if mapping.clusterable) +_CLUSTERABLE_SIMPLE_TYPES = set( + mapping.dtype for mapping in SIMPLE_TYPES if mapping.clusterable +) def is_clusterable(type: ExpressionType) -> bool: diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index bf2d00578b..cb28686d59 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -2019,6 +2019,21 @@ def test_series_peek_force(scalars_dfs): ) +@skip_legacy_pandas +def test_series_peek_force_float(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + + cumsum_df = scalars_df[["int64_col", "float64_col"]].cumsum() + df_filtered = cumsum_df[cumsum_df.float64_col > 0]["float64_col"] + peek_result = df_filtered.peek(n=3, force=True) + pd_cumsum_df = scalars_pandas_df[["int64_col", "float64_col"]].cumsum() + pd_result = pd_cumsum_df[pd_cumsum_df.float64_col > 0]["float64_col"] + pd.testing.assert_series_equal( + peek_result, + pd_result.reindex_like(peek_result), + ) + + def test_shift(scalars_df_index, scalars_pandas_df_index): col_name = "int64_col" bf_result = scalars_df_index[col_name].shift().to_pandas() From e91dbb5919fc9094a632006822975f86e5cecafb Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 26 Jun 2024 16:37:26 +0000 Subject: [PATCH 14/14] remove partial comment --- bigframes/session/planner.py | 1 - 1 file changed, 1 deletion(-) diff --git a/bigframes/session/planner.py b/bigframes/session/planner.py index fc52ee236f..2a74521b43 100644 --- a/bigframes/session/planner.py +++ b/bigframes/session/planner.py @@ -59,7 +59,6 @@ def session_aware_cache_plan( caching_target, caching_target_refs = cur_node, cur_node_refs schema = cur_node.schema # Cluster cols only consider the target object and not other sesssion objects - # Note, this clusterable_cols = set( itertools.chain.from_iterable( map(