From 90c8e4593e4861c3ee45a4cd6bdacdd371b1e191 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 5 Sep 2025 22:07:02 +0000 Subject: [PATCH 1/3] feat: Can call agg with some callables --- bigframes/core/groupby/dataframe_group_by.py | 26 ++++++------ bigframes/core/groupby/series_group_by.py | 17 ++++---- bigframes/dataframe.py | 25 +++++------- bigframes/operations/aggregations.py | 42 ++++++++++++++------ bigframes/series.py | 6 +-- tests/system/small/test_dataframe.py | 17 +++++++- tests/system/small/test_groupby.py | 32 +++++++++------ 7 files changed, 97 insertions(+), 68 deletions(-) diff --git a/bigframes/core/groupby/dataframe_group_by.py b/bigframes/core/groupby/dataframe_group_by.py index 3f5480436a..7d3d3ada69 100644 --- a/bigframes/core/groupby/dataframe_group_by.py +++ b/bigframes/core/groupby/dataframe_group_by.py @@ -461,23 +461,19 @@ def expanding(self, min_periods: int = 1) -> windows.Window: def agg(self, func=None, **kwargs) -> typing.Union[df.DataFrame, series.Series]: if func: - if isinstance(func, str): - return self.size() if func == "size" else self._agg_string(func) - elif utils.is_dict_like(func): + if utils.is_dict_like(func): return self._agg_dict(func) elif utils.is_list_like(func): return self._agg_list(func) else: - raise NotImplementedError( - f"Aggregate with {func} not supported. {constants.FEEDBACK_LINK}" - ) + return self.size() if func == "size" else self._agg_func(func) else: return self._agg_named(**kwargs) - def _agg_string(self, func: str) -> df.DataFrame: + def _agg_func(self, func) -> df.DataFrame: ids, labels = self._aggregated_columns() aggregations = [ - aggs.agg(col_id, agg_ops.lookup_agg_func(func)) for col_id in ids + aggs.agg(col_id, agg_ops.lookup_agg_func(func)[0]) for col_id in ids ] agg_block, _ = self._block.aggregate( by_column_ids=self._by_col_ids, @@ -500,7 +496,7 @@ def _agg_dict(self, func: typing.Mapping) -> df.DataFrame: funcs_for_id if utils.is_list_like(funcs_for_id) else [funcs_for_id] ) for f in func_list: - aggregations.append(aggs.agg(col_id, agg_ops.lookup_agg_func(f))) + aggregations.append(aggs.agg(col_id, agg_ops.lookup_agg_func(f)[0])) column_labels.append(label) agg_block, _ = self._block.aggregate( by_column_ids=self._by_col_ids, @@ -525,19 +521,23 @@ def _agg_dict(self, func: typing.Mapping) -> df.DataFrame: def _agg_list(self, func: typing.Sequence) -> df.DataFrame: ids, labels = self._aggregated_columns() aggregations = [ - aggs.agg(col_id, agg_ops.lookup_agg_func(f)) for col_id in ids for f in func + aggs.agg(col_id, agg_ops.lookup_agg_func(f)[0]) + for col_id in ids + for f in func ] if self._block.column_labels.nlevels > 1: # Restructure MultiIndex for proper format: (idx1, idx2, func) # rather than ((idx1, idx2), func). column_labels = [ - tuple(label) + (f,) + tuple(label) + (agg_ops.lookup_agg_func(f)[1],) for label in labels.to_frame(index=False).to_numpy() for f in func ] else: # Single-level index - column_labels = [(label, f) for label in labels for f in func] + column_labels = [ + (label, agg_ops.lookup_agg_func(f)[1]) for label in labels for f in func + ] agg_block, _ = self._block.aggregate( by_column_ids=self._by_col_ids, @@ -563,7 +563,7 @@ def _agg_named(self, **kwargs) -> df.DataFrame: if not isinstance(v, tuple) or (len(v) != 2): raise TypeError("kwargs values must be 2-tuples of column, aggfunc") col_id = self._resolve_label(v[0]) - aggregations.append(aggs.agg(col_id, agg_ops.lookup_agg_func(v[1]))) + aggregations.append(aggs.agg(col_id, agg_ops.lookup_agg_func(v[1])[0])) column_labels.append(k) agg_block, _ = self._block.aggregate( by_column_ids=self._by_col_ids, diff --git a/bigframes/core/groupby/series_group_by.py b/bigframes/core/groupby/series_group_by.py index 7a8bdcb6cf..041cc1b3dd 100644 --- a/bigframes/core/groupby/series_group_by.py +++ b/bigframes/core/groupby/series_group_by.py @@ -216,18 +216,17 @@ def prod(self, *args) -> series.Series: def agg(self, func=None) -> typing.Union[df.DataFrame, series.Series]: column_names: list[str] = [] - if isinstance(func, str): - aggregations = [aggs.agg(self._value_column, agg_ops.lookup_agg_func(func))] - column_names = [func] - elif utils.is_list_like(func): - aggregations = [ - aggs.agg(self._value_column, agg_ops.lookup_agg_func(f)) for f in func - ] - column_names = list(func) - else: + if utils.is_dict_like(func): raise NotImplementedError( f"Aggregate with {func} not supported. {constants.FEEDBACK_LINK}" ) + if not utils.is_list_like(func): + func = [func] + + aggregations = [ + aggs.agg(self._value_column, agg_ops.lookup_agg_func(f)[0]) for f in func + ] + column_names = [agg_ops.lookup_agg_func(f)[1] for f in func] agg_block, _ = self._block.aggregate( by_column_ids=self._by_col_ids, diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index c65bbdd2c8..60c685bf67 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3170,12 +3170,7 @@ def nunique(self) -> bigframes.series.Series: block = self._block.aggregate_all_and_stack(agg_ops.nunique_op) return bigframes.series.Series(block) - def agg( - self, - func: str - | typing.Sequence[str] - | typing.Mapping[blocks.Label, typing.Sequence[str] | str], - ) -> DataFrame | bigframes.series.Series: + def agg(self, func) -> DataFrame | bigframes.series.Series: if utils.is_dict_like(func): # Must check dict-like first because dictionaries are list-like # according to Pandas. @@ -3189,15 +3184,17 @@ def agg( if col_id is None: raise KeyError(f"Column {col_label} does not exist") for agg_func in agg_func_list: - agg_op = agg_ops.lookup_agg_func(typing.cast(str, agg_func)) + op_and_label = agg_ops.lookup_agg_func(agg_func) agg_expr = ( - agg_expressions.UnaryAggregation(agg_op, ex.deref(col_id)) - if isinstance(agg_op, agg_ops.UnaryAggregateOp) - else agg_expressions.NullaryAggregation(agg_op) + agg_expressions.UnaryAggregation( + op_and_label[0], ex.deref(col_id) + ) + if isinstance(op_and_label[0], agg_ops.UnaryAggregateOp) + else agg_expressions.NullaryAggregation(op_and_label[0]) ) aggs.append(agg_expr) labels.append(col_label) - funcnames.append(agg_func) + funcnames.append(op_and_label[1]) # if any list in dict values, format output differently if any(utils.is_list_like(v) for v in func.values()): @@ -3218,7 +3215,7 @@ def agg( ) ) elif utils.is_list_like(func): - aggregations = [agg_ops.lookup_agg_func(f) for f in func] + aggregations = [agg_ops.lookup_agg_func(f)[0] for f in func] for dtype, agg in itertools.product(self.dtypes, aggregations): agg.output_type( @@ -3234,9 +3231,7 @@ def agg( else: # function name string return bigframes.series.Series( - self._block.aggregate_all_and_stack( - agg_ops.lookup_agg_func(typing.cast(str, func)) - ) + self._block.aggregate_all_and_stack(agg_ops.lookup_agg_func(func)[0]) ) aggregate = agg diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index 81ab18272c..3cdc0920cd 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -17,8 +17,9 @@ import abc import dataclasses import typing -from typing import ClassVar, Iterable, Optional, TYPE_CHECKING +from typing import Callable, ClassVar, Iterable, Optional, TYPE_CHECKING +import numpy as np import pandas as pd import pyarrow as pa @@ -661,7 +662,7 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT # TODO: Alternative names and lookup from numpy function objects -_AGGREGATIONS_LOOKUP: typing.Dict[ +_STRING_TO_AGG_OP: typing.Dict[ str, typing.Union[UnaryAggregateOp, NullaryAggregateOp] ] = { op.name: op @@ -688,17 +689,32 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT ] } +_CALLABLE_TO_AGG_OP: typing.Dict[ + Callable, typing.Union[UnaryAggregateOp, NullaryAggregateOp] +] = { + np.sum: sum_op, + np.mean: mean_op, + np.median: median_op, + np.prod: product_op, + np.max: max_op, + np.min: min_op, + np.std: std_op, + np.var: var_op, + np.all: all_op, + np.any: any_op, + np.unique: nunique_op, + # TODO(b/443252872): Solve + # list: ArrayAggOp(), + np.size: size_op, +} -def lookup_agg_func(key: str) -> typing.Union[UnaryAggregateOp, NullaryAggregateOp]: - if callable(key): - raise NotImplementedError( - "Aggregating with callable object not supported, pass method name as string instead (eg. 'sum' instead of np.sum)." - ) - if not isinstance(key, str): - raise ValueError( - f"Cannot aggregate using object of type: {type(key)}. Use string method name (eg. 'sum')" - ) - if key in _AGGREGATIONS_LOOKUP: - return _AGGREGATIONS_LOOKUP[key] + +def lookup_agg_func( + key, +) -> tuple[typing.Union[UnaryAggregateOp, NullaryAggregateOp], str]: + if key in _STRING_TO_AGG_OP: + return (_STRING_TO_AGG_OP[key], key) + if key in _CALLABLE_TO_AGG_OP: + return (_CALLABLE_TO_AGG_OP[key], key.__name__) else: raise ValueError(f"Unrecognize aggregate function: {key}") diff --git a/bigframes/series.py b/bigframes/series.py index 3e24a75d9b..e44cf417ab 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1330,7 +1330,7 @@ def agg(self, func: str | typing.Sequence[str]) -> scalars.Scalar | Series: raise NotImplementedError( f"Multiple aggregations only supported on numeric series. {constants.FEEDBACK_LINK}" ) - aggregations = [agg_ops.lookup_agg_func(f) for f in func] + aggregations = [agg_ops.lookup_agg_func(f)[0] for f in func] return Series( self._block.summarize( [self._value_column], @@ -1338,9 +1338,7 @@ def agg(self, func: str | typing.Sequence[str]) -> scalars.Scalar | Series: ) ) else: - return self._apply_aggregation( - agg_ops.lookup_agg_func(typing.cast(str, func)) - ) + return self._apply_aggregation(agg_ops.lookup_agg_func(func)[0]) aggregate = agg aggregate.__doc__ = inspect.getdoc(vendored_pandas_series.Series.agg) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 323956b038..9f20218671 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -6011,7 +6011,7 @@ def test_astype_invalid_type_fail(scalars_dfs): bf_df.astype(123) -def test_agg_with_dict_lists(scalars_dfs): +def test_agg_with_dict_lists_strings(scalars_dfs): bf_df, pd_df = scalars_dfs agg_funcs = { "int64_too": ["min", "max"], @@ -6026,6 +6026,21 @@ def test_agg_with_dict_lists(scalars_dfs): ) +def test_agg_with_dict_lists_callables(scalars_dfs): + bf_df, pd_df = scalars_dfs + agg_funcs = { + "int64_too": [np.min, np.max], + "int64_col": [np.min, np.var], + } + + bf_result = bf_df.agg(agg_funcs).to_pandas() + pd_result = pd_df.agg(agg_funcs) + + pd.testing.assert_frame_equal( + bf_result, pd_result, check_dtype=False, check_index_type=False + ) + + def test_agg_with_dict_list_and_str(scalars_dfs): bf_df, pd_df = scalars_dfs agg_funcs = { diff --git a/tests/system/small/test_groupby.py b/tests/system/small/test_groupby.py index 5c89363e9b..b940f93d48 100644 --- a/tests/system/small/test_groupby.py +++ b/tests/system/small/test_groupby.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import numpy as np import pandas as pd import pytest @@ -223,7 +224,7 @@ def test_dataframe_groupby_agg_list(scalars_df_index, scalars_pandas_df_index): pd_result = ( scalars_pandas_df_index[col_names] .groupby("string_col") - .agg(["count", "min", "size"]) + .agg(["count", np.min, "size"]) ) bf_result_computed = bf_result.to_pandas() @@ -240,8 +241,8 @@ def test_dataframe_groupby_agg_list_w_column_multi_index( pd_df = scalars_pandas_df_index[columns].copy() pd_df.columns = multi_columns - bf_result = bf_df.groupby(level=0).agg(["count", "min", "size"]) - pd_result = pd_df.groupby(level=0).agg(["count", "min", "size"]) + bf_result = bf_df.groupby(level=0).agg(["count", np.min, "size"]) + pd_result = pd_df.groupby(level=0).agg(["count", np.min, "size"]) bf_result_computed = bf_result.to_pandas() pd.testing.assert_frame_equal(pd_result, bf_result_computed, check_dtype=False) @@ -261,12 +262,16 @@ def test_dataframe_groupby_agg_dict_with_list( bf_result = ( scalars_df_index[col_names] .groupby("string_col", as_index=as_index) - .agg({"int64_too": ["mean", "max"], "string_col": "count", "bool_col": "size"}) + .agg( + {"int64_too": [np.mean, np.max], "string_col": "count", "bool_col": "size"} + ) ) pd_result = ( scalars_pandas_df_index[col_names] .groupby("string_col", as_index=as_index) - .agg({"int64_too": ["mean", "max"], "string_col": "count", "bool_col": "size"}) + .agg( + {"int64_too": [np.mean, np.max], "string_col": "count", "bool_col": "size"} + ) ) bf_result_computed = bf_result.to_pandas() @@ -280,12 +285,12 @@ def test_dataframe_groupby_agg_dict_no_lists(scalars_df_index, scalars_pandas_df bf_result = ( scalars_df_index[col_names] .groupby("string_col") - .agg({"int64_too": "mean", "string_col": "count"}) + .agg({"int64_too": np.mean, "string_col": "count"}) ) pd_result = ( scalars_pandas_df_index[col_names] .groupby("string_col") - .agg({"int64_too": "mean", "string_col": "count"}) + .agg({"int64_too": np.mean, "string_col": "count"}) ) bf_result_computed = bf_result.to_pandas() @@ -298,7 +303,7 @@ def test_dataframe_groupby_agg_named(scalars_df_index, scalars_pandas_df_index): scalars_df_index[col_names] .groupby("string_col") .agg( - agg1=bpd.NamedAgg("int64_too", "sum"), + agg1=bpd.NamedAgg("int64_too", np.sum), agg2=bpd.NamedAgg("float64_col", "max"), ) ) @@ -306,7 +311,8 @@ def test_dataframe_groupby_agg_named(scalars_df_index, scalars_pandas_df_index): scalars_pandas_df_index[col_names] .groupby("string_col") .agg( - agg1=pd.NamedAgg("int64_too", "sum"), agg2=pd.NamedAgg("float64_col", "max") + agg1=pd.NamedAgg("int64_too", np.sum), + agg2=pd.NamedAgg("float64_col", "max"), ) ) bf_result_computed = bf_result.to_pandas() @@ -320,14 +326,14 @@ def test_dataframe_groupby_agg_kw_tuples(scalars_df_index, scalars_pandas_df_ind scalars_df_index[col_names] .groupby("string_col") .agg( - agg1=("int64_too", "sum"), + agg1=("int64_too", np.sum), agg2=("float64_col", "max"), ) ) pd_result = ( scalars_pandas_df_index[col_names] .groupby("string_col") - .agg(agg1=("int64_too", "sum"), agg2=("float64_col", "max")) + .agg(agg1=("int64_too", np.sum), agg2=("float64_col", "max")) ) bf_result_computed = bf_result.to_pandas() @@ -709,12 +715,12 @@ def test_series_groupby_agg_list(scalars_df_index, scalars_pandas_df_index): bf_result = ( scalars_df_index["int64_col"] .groupby(scalars_df_index["string_col"]) - .agg(["sum", "mean", "size"]) + .agg(["sum", np.mean, "size"]) ) pd_result = ( scalars_pandas_df_index["int64_col"] .groupby(scalars_pandas_df_index["string_col"]) - .agg(["sum", "mean", "size"]) + .agg(["sum", np.mean, "size"]) ) bf_result_computed = bf_result.to_pandas() From e7e01a93d0b8641780e3f5cb5d9b541a13b95bf2 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 12 Sep 2025 00:42:19 +0000 Subject: [PATCH 2/3] deal with different agg labels --- tests/system/small/test_groupby.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/system/small/test_groupby.py b/tests/system/small/test_groupby.py index b940f93d48..b8a17bf501 100644 --- a/tests/system/small/test_groupby.py +++ b/tests/system/small/test_groupby.py @@ -219,7 +219,7 @@ def test_dataframe_groupby_agg_size_string(scalars_df_index, scalars_pandas_df_i def test_dataframe_groupby_agg_list(scalars_df_index, scalars_pandas_df_index): col_names = ["int64_too", "float64_col", "int64_col", "bool_col", "string_col"] bf_result = ( - scalars_df_index[col_names].groupby("string_col").agg(["count", "min", "size"]) + scalars_df_index[col_names].groupby("string_col").agg(["count", np.min, "size"]) ) pd_result = ( scalars_pandas_df_index[col_names] @@ -228,7 +228,11 @@ def test_dataframe_groupby_agg_list(scalars_df_index, scalars_pandas_df_index): ) bf_result_computed = bf_result.to_pandas() - pd.testing.assert_frame_equal(pd_result, bf_result_computed, check_dtype=False) + # some inconsistency between versions, so normalize to bigframes behavior + pd_result = pd_result.rename({"amin": "min"}, axis="columns") + pd.testing.assert_frame_equal( + pd_result, bf_result_computed, check_dtype=False, check_index_type=False + ) def test_dataframe_groupby_agg_list_w_column_multi_index( @@ -275,6 +279,8 @@ def test_dataframe_groupby_agg_dict_with_list( ) bf_result_computed = bf_result.to_pandas() + # some inconsistency between versions, so normalize to bigframes behavior + pd_result = pd_result.rename({"amax": "max"}, axis="columns") pd.testing.assert_frame_equal( pd_result, bf_result_computed, check_dtype=False, check_index_type=False ) From 580e9f42bea8fef3acf4087722c1e40595ce073d Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 12 Sep 2025 18:06:04 +0000 Subject: [PATCH 3/3] fix test --- tests/system/small/test_groupby.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/system/small/test_groupby.py b/tests/system/small/test_groupby.py index b8a17bf501..dba8d46676 100644 --- a/tests/system/small/test_groupby.py +++ b/tests/system/small/test_groupby.py @@ -230,6 +230,7 @@ def test_dataframe_groupby_agg_list(scalars_df_index, scalars_pandas_df_index): # some inconsistency between versions, so normalize to bigframes behavior pd_result = pd_result.rename({"amin": "min"}, axis="columns") + bf_result_computed = bf_result_computed.rename({"amin": "min"}, axis="columns") pd.testing.assert_frame_equal( pd_result, bf_result_computed, check_dtype=False, check_index_type=False )