8000 feat: add .agg support for size by Genesis929 · Pull Request #792 · googleapis/python-bigquery-dataframes · GitHub
[go: up one dir, main page]

Skip to content
55 changes: 46 additions & 9 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,7 @@ def filter(self, predicate: scalars.Expression):

def aggregate_all_and_stack(
self,
operation: agg_ops.UnaryAggregateOp,
operation: typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp],
*,
axis: int | str = 0,
value_col_id: str = "values",
Expand All @@ -1004,7 +1004,12 @@ def aggregate_all_and_stack(
axis_n = utils.get_axis_number(axis)
if axis_n == 0:
aggregations = [
(ex.UnaryAggregation(operation, ex.free_var(col_id)), col_id)
(
ex.UnaryAggregation(operation, ex.free_var(col_id))
if isinstance(operation, agg_ops.UnaryAggregateOp)
else ex.NullaryAggregation(operation),
col_id,
)
for col_id in self.value_columns
]
index_id = guid.generate_guid()
Expand Down Expand Up @@ -1033,6 +1038,11 @@ def aggregate_all_and_stack(
(ex.UnaryAggregation(agg_ops.AnyValueOp(), ex.free_var(col_id)), col_id)
for col_id in [*self.index_columns]
]
# TODO: may need add NullaryAggregation in main_aggregation
# when agg add support for axis=1, needed for agg("size", axis=1)
assert isinstance(
operation, agg_ops.UnaryAggregateOp
), f"Expected a unary operation, but got {operation}. Please report this error and how you got here to the BigQuery DataFrames team (bit.ly/bigframes-feedback)."
main_aggregation = (
ex.UnaryAggregation(operation, ex.free_var(value_col_id)),
value_col_id,
Expand Down Expand Up @@ -1125,7 +1135,11 @@ def remap_f(x):
def aggregate(
self,
by_column_ids: typing.Sequence[str] = (),
aggregations: typing.Sequence[typing.Tuple[str, agg_ops.UnaryAggregateOp]] = (),
aggregations: typing.Sequence[
typing.Tuple[
str, typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp]
]
] = (),
*,
dropna: bool = True,
) -> typing.Tuple[Block, typing.Sequence[str]]:
Expand All @@ -1139,7 +1153,9 @@ def aggregate(
"""
agg_specs = [
(
ex.UnaryAggregation(operation, ex.free_var(input_id)),
ex.UnaryAggregation(operation, ex.free_var(input_id))
if isinstance(operation, agg_ops.UnaryAggregateOp)
else ex.NullaryAggregation(operation),
guid.generate_guid(),
)
for input_id, operation in aggregations
Expand Down Expand Up @@ -1175,18 +1191,32 @@ def aggregate(
output_col_ids,
)

def get_stat(self, column_id: str, stat: agg_ops.UnaryAggregateOp):
def get_stat(
self,
column_id: str,
stat: typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp],
):
"""Gets aggregates immediately, and caches it"""
if stat.name in self._stats_cache[column_id]:
return self._stats_cache[column_id][stat.name]

# TODO: Convert nonstandard stats into standard stats where possible (popvar, etc.)
# if getting a standard stat, just go get the rest of them
standard_stats = self._standard_stats(column_id)
standard_stats = typing.cast(
typing.Sequence[
typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp]
],
self._standard_stats(column_id),
)
stats_to_fetch = standard_stats if stat in standard_stats else [stat]

aggregations = [
(ex.UnaryAggregation(stat, ex.free_var(column_id)), stat.name)
(
ex.UnaryAggregation(stat, ex.free_var(column_id))
if isinstance(stat, agg_ops.UnaryAggregateOp)
else ex.NullaryAggregation(stat),
stat.name,
)
for stat in stats_to_fetch
]
expr = self.expr.aggregate(aggregations)
Expand Down Expand Up @@ -1231,13 +1261,20 @@ def get_binary_stat(
def summarize(
self,
column_ids: typing.Sequence[str],
stats: typing.Sequence[agg_ops.UnaryAggregateOp],
stats: typing.Sequence[
typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp]
],
):
"""Get a list of stats as a deferred block object."""
label_col_id = guid.generate_guid()
labels = [stat.name for stat in stats]
aggregations = [
(ex.UnaryAggregation(stat, ex.free_var(col_id)), f"{col_id}-{stat.name}")
(
ex.UnaryAggregation(stat, ex.free_var(col_id))
if isinstance(stat, agg_ops.UnaryAggregateOp)
else ex.NullaryAggregation(stat),
f"{col_id}-{stat.name}",
)
for stat in stats
for col_id in column_ids
]
Expand Down
10 changes: 7 additions & 3 deletions bigframes/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,10 +286,10 @@ def expanding(self, min_periods: int = 1) -> windows.Window:
block, window_spec, self._selected_cols, drop_null_groups=self._dropna
)

def agg(self, func=None, **kwargs) -> df.DataFrame:
def agg(self, func=None, **kwargs) -> typing.Union[df.DataFrame, series.Series]:
if func:
if isinstance(func, str):
return self._agg_string(func)
return self.size() if func == "size" else self._agg_string(func)
elif utils.is_dict_like(func):
return self._agg_dict(func)
elif utils.is_list_like(func):
Expand All @@ -315,7 +315,11 @@ def _agg_string(self, func: str) -> df.DataFrame:
return dataframe if self._as_index else self._convert_index(dataframe)

def _agg_dict(self, func: typing.Mapping) -> df.DataFrame:
aggregations: typing.List[typing.Tuple[str, agg_ops.UnaryAggregateOp]] = []
aggregations: typing.List[
typing.Tuple[
str, typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp]
]
] = []
column_labels = []

want_aggfunc_level = any(utils.is_list_like(aggs) for aggs in func.values())
Expand Down
10 changes: 8 additions & 2 deletions bigframes/operations/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,9 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT


# TODO: Alternative names and lookup from numpy function objects
_AGGREGATIONS_LOOKUP: dict[str, UnaryAggregateOp] = {
_AGGREGATIONS_LOOKUP: typing.Dict[
str, typing.Union[UnaryAggregateOp, NullaryAggregateOp]
] = {
op.name: op
for op in [
sum_op,
Expand All @@ -506,10 +508,14 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT
ApproxQuartilesOp(2),
ApproxQuartilesOp(3),
]
+ [
# Add size_op separately to avoid Mypy type inference errors.
size_op,
]
}


def lookup_agg_func(key: str) -> UnaryAggregateOp:
def lookup_agg_func(key: str) -> typing.Union[UnaryAggregateOp, NullaryAggregateOp]:
if callable(key):
raise NotImplementedError(
"Aggregating with callable object not supported, pass method name as string instead (eg. 'sum' instead of np.sum)."
Expand Down
5 changes: 3 additions & 2 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,6 @@ def agg(self, func: str | typing.Sequence[str]) -> scalars.Scalar | Series:
)
)
else:

return self._apply_aggregation(
agg_ops.lookup_agg_func(typing.cast(str, func))
)
Expand Down Expand Up @@ -1246,7 +1245,9 @@ def _align3(self, other1: Series | scalars.Scalar, other2: Series | scalars.Scal
values, index = self._align_n([other1, other2], how)
return (values[0], values[1], values[2], index)

def _apply_aggregation(self, op: agg_ops.UnaryAggregateOp) -> Any:
def _apply_aggregation(
self, op: agg_ops.UnaryAggregateOp | agg_ops.NullaryAggregateOp
) -> Any:
return self._block.get_stat(self._value_column, op)

def _apply_window_op(
Expand Down
14 changes: 11 additions & 3 deletions tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2485,12 +2485,19 @@ def test_dataframe_agg_single_string(scalars_dfs):
)


def test_dataframe_agg_int_single_string(scalars_dfs):
@pytest.mark.parametrize(
("agg",),
(
("sum",),
("size",),
),
)
def test_dataframe_agg_int_single_string(scalars_dfs, agg):
numeric_cols = ["int64_col", "int64_too", "bool_col"]
scalars_df, scalars_pandas_df = scalars_dfs

bf_result = scalars_df[numeric_cols].agg("sum").to_pandas()
pd_result = scalars_pandas_df[numeric_cols].agg("sum")
bf_result = scalars_df[numeric_cols].agg(agg).to_pandas()
pd_result = scalars_pandas_df[numeric_cols].agg(agg)

assert bf_result.dtype == "Int64"
pd.testing.assert_series_equal(
Expand Down Expand Up @@ -2537,6 +2544,7 @@ def test_dataframe_agg_int_multi_string(scalars_dfs):
"sum",
"nunique",
"count",
"size",
]
scalars_df, scalars_pandas_df = scalars_dfs
bf_result = scalars_df[numeric_cols].agg(aggregations).to_pandas()
Expand Down
43 changes: 30 additions & 13 deletions tests/system/small/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,23 @@ def test_dataframe_groupby_agg_string(
)


def test_dataframe_groupby_agg_size_string(scalars_df_index, scalars_pandas_df_index):
col_names = ["int64_too", "float64_col", "int64_col", "bool_col", "string_col"]
bf_result = scalars_df_index[col_names].groupby("string_col").agg("size")
pd_result = scal E865 ars_pandas_df_index[col_names].groupby("string_col").agg("size")

pd.testing.assert_series_equal(pd_result, bf_result.to_pandas(), check_dtype=False)


def test_dataframe_groupby_agg_list(scalars_df_index, scalars_pandas_df_index):
col_names = ["int64_too", "float64_col", "int64_col", "bool_col", "string_col"]
bf_result = scalars_df_index[col_names].groupby("string_col").agg(["count", "min"])
bf_result = (
scalars_df_index[col_names].groupby("string_col").agg(["count", "min", "size"])
)
pd_result = (
scalars_pandas_df_index[col_names].groupby("string_col").agg(["count", "min"])
scalars_pandas_df_index[col_names]
.groupby("string_col")
.agg(["count", "min", "size"])
)
bf_result_computed = bf_result.to_pandas()

Expand All @@ -161,8 +173,8 @@ def test_dataframe_groupby_agg_list_w_column_multi_index(
pd_df = scalars_pandas_df_index[columns].copy()
pd_df.columns = multi_columns

bf_result = bf_df.groupby(level=0).agg(["count", "min"])
pd_result = pd_df.groupby(level=0).agg(["count", "min"])
bf_result = bf_df.groupby(level=0).agg(["count", "min", "size"])
pd_result = pd_df.groupby(level=0).agg(["count", "min", "size"])

bf_result_computed = bf_result.to_pandas()
pd.testing.assert_frame_equal(pd_result, bf_result_computed, check_dtype=False)
Expand All @@ -182,12 +194,12 @@ def test_dataframe_groupby_agg_dict_with_list(
bf_result = (
scalars_df_index[col_names]
.groupby("string_col", as_index=as_index)
.agg({"int64_too": ["mean", "max"], "string_col": "count"})
.agg({"int64_too": ["mean", "max"], "string_col": "count", "bool_col": "size"})
)
pd_result = (
scalars_pandas_df_index[col_names]
.groupby("string_col", as_index=as_index)
.agg({"int64_too": ["mean", "max"], "string_col": "count"})
.agg({"int64_too": ["mean", "max"], "string_col": "count", "bool_col": "size"})
)
bf_result_computed = bf_result.to_pandas()

Expand Down Expand Up @@ -413,16 +425,21 @@ def test_dataframe_groupby_nonnumeric_with_mean():
# ==============


def test_series_groupby_agg_string(scalars_df_index, scalars_pandas_df_index):
@pytest.mark.parametrize(
("agg"),
[
("count"),
("size"),
],
)
def test_series_groupby_agg_string(scalars_df_index, scalars_pandas_df_index, agg):
bf_result = (
scalars_df_index["int64_col"]
.groupby(scalars_df_index["string_col"])
.agg("count")
scalars_df_index["int64_col"].groupby(scalars_df_index["string_col"]).agg(agg)
)
pd_result = (
scalars_pandas_df_index["int64_col"]
.groupby(scalars_pandas_df_index["string_col"])
.agg("count")
.agg(agg)
)
bf_result_computed = bf_result.to_pandas()

Expand All @@ -435,12 +452,12 @@ def test_series_groupby_agg_list(scalars_df_index, scalars_pandas_df_index):
bf_result = (
scalars_df_index["int64_col"]
.groupby(scalars_df_index["string_col"])
.agg(["sum", "mean"])
.agg(["sum", "mean", "size"])
)
pd_result = (
scalars_pandas_df_index["int64_col"]
.groupby(scalars_pandas_df_index["string_col"])
.agg(["sum", "mean"])
.agg(["sum", "mean", "size"])
)
bf_result_computed = bf_result.to_pandas()

Expand Down
25 changes: 21 additions & 4 deletions tests/system/small/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,15 +506,32 @@ def test_series_dropna(scalars_dfs, ignore_index):
pd.testing.assert_series_equal(pd_result, bf_result, check_index_type=False)


def test_series_agg_single_string(scalars_dfs):
@pytest.mark.parametrize(
("agg",),
(
("sum",),
("size",),
),
)
def test_series_agg_single_string(scalars_dfs, agg):
scalars_df, scalars_pandas_df = scalars_dfs
bf_result = scalars_df["int64_col"].agg("sum")
pd_result = scalars_pandas_df["int64_col"].agg("sum")
bf_result = scalars_df["int64_col"].agg(agg)
pd_result = scalars_pandas_df["int64_col"].agg(agg)
assert math.isclose(pd_result, bf_result)


def test_series_agg_multi_string(scalars_dfs):
aggregations = ["sum", "mean", "std", "var", "min", "max", "nunique", "count"]
aggregations = [
"sum",
"mean",
"std",
"var",
"min",
"max",
"nunique",
"count",
"size",
]
scalars_df, scalars_pandas_df = scalars_dfs
bf_result = scalars_df["int64_col"].agg(aggregations).to_pandas()
pd_result = scalars_pandas_df["int64_col"].agg(aggregations)
Expand Down
0