8000 feat: add `GroupBy.__iter__` by tswast · Pull Request #1394 · googleapis/python-bigquery-dataframes · GitHub
[go: up one dir, main page]

Skip to content
6 changes: 6 additions & 0 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1375,10 +1375,16 @@ def aggregate(
) -> typing.Tuple[Block, typing.Sequence[str]]:
"""
Apply aggregations to the block.

Arguments:
by_column_id: column id of the aggregation key, this is preserved through the transform and used as index.
aggregations: input_column_id, operation tuples
dropna: whether null keys should be dropped

Returns:
Tuple[Block, Sequence[str]]:
The first element is the grouped block. The second is the
column IDs corresponding to each applied aggregation.
"""
if column_labels is None:
column_labels = pd.Index(range(len(aggregations)))
Expand Down
18 changes: 16 additions & 2 deletions bigframes/core/groupby/dataframe_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import datetime
import typing
from typing import Literal, Optional, Sequence, Tuple, Union
from typing import Iterable, Literal, Optional, Sequence, Tuple, Union

import bigframes_vendored.constants as constants
import bigframes_vendored.pandas.core.groupby as vendored_pandas_groupby
Expand All @@ -29,7 +29,7 @@
from bigframes.core import log_adapter
import bigframes.core.block_transforms as block_ops
import bigframes.core.blocks as blocks
from bigframes.core.groupby import aggs, series_group_by
from bigframes.core.groupby import aggs, group_by, series_group_by
import bigframes.core.ordering as order
import bigframes.core.utils as utils
import bigframes.core.validations as validations
Expand All @@ -54,6 +54,7 @@ def __init__(
selected_cols: typing.Optional[typing.Sequence[str]] = None,
dropna: bool = True,
as_index: bool = True,
by_key_is_singular: bool = False,
):
# TODO(tbergeron): Support more group-by expression types
self._block = block
Expand All @@ -64,6 +65,9 @@ def __init__(
)
}
self._by_col_ids = by_col_ids
self._by_key_is_singular = by_key_is_singular
if by_key_is_singular:
assert len(by_col_ids) == 1, "singular key should be exactly one group key"

self._dropna = dropna
self._as_index = as_index
Expand Down Expand Up @@ -163,6 +167,16 @@ def describe(self, include: None | Literal["all"] = None):
)
)

def __iter__(self) -> Iterable[Tuple[blocks.Label, df.DataFrame]]:
for group_keys, filtered_block in group_by.block_groupby_iter(
self._block,
by_col_ids=self._by_col_ids,
by_key_is_singular=self._by_key_is_singular,
dropna=self._dropna,
):
filtered_df = df.DataFrame(filtered_block)
yield group_keys, filtered_df

def size(self) -> typing.Union[df.DataFrame, series.Series]:
agg_block, _ = self._block.aggregate_size(
by_column_ids=self._by_col_ids,
Expand Down
91 changes: 91 additions & 0 deletions bigframes/core/groupby/group_by.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import functools
from typing import Sequence

import pandas as pd

from bigframes.core import blocks
from bigframes.core import expression as ex
import bigframes.enums
import bigframes.operations as ops


def block_groupby_iter(
block: blocks.Block,
*,
by_col_ids: Sequence[str],
by_key_is_singular: bool,
dropna: bool,
):
original_index_columns = block._index_columns
original_index_labels = block._index_labels
by_col_ids = by_col_ids
block = block.reset_index(
level=None,
# Keep the original index columns so they can be recovered.
drop=False,
allow_duplicates=True,
replacement=bigframes.enums.DefaultIndexKind.NULL,
).set_index(
by_col_ids,
# Keep by_col_ids in-place so the ordering doesn't change.
drop=False,
append=False,
)
block.cached(
force=True,
# All DataFrames will be filtered by by_col_ids, so
# force block.cached() to cluster by the new index by explicitly
# setting `session_aware=False`. This will ensure that the filters
# are more efficient.
session_aware=False,
)
keys_block, _ = block.aggregate(by_col_ids, dropna=dropna)
for chunk in keys_block.to_pandas_batches():
# Convert to MultiIndex to make sure we get tuples,
# even for singular keys.
by_keys_index = chunk.index
if not isinstance(by_keys_index, pd.MultiIndex):
by_keys_index = pd.MultiIndex.from_frame(by_keys_index.to_frame())

for by_keys in by_keys_index:
filtered_block = (
# To ensure the cache is used, filter first, then reset the
# index before yielding the DataFrame.
block.filter(
functools.reduce(
ops.and_op.as_expr,
(
ops.eq_op.as_expr(by_col, ex.const(by_key))
for by_col, by_key in zip(by_col_ids, by_keys)
),
),
).set_index(
original_index_columns,
# We retained by_col_ids in the set_index call above,
# so it's safe to drop the duplicates now.
drop=True,
append=False,
index_labels=original_index_labels,
)
)

if by_key_is_singular:
yield by_keys[0], filtered_block
else:
yield by_keys, filtered_block
23 changes: 21 additions & 2 deletions bigframes/core/groupby/series_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import datetime
import typing
from typing import Literal, Sequence, Union
from typing import Iterable, Literal, Sequence, Tuple, Union

import bigframes_vendored.constants as constants
import bigframes_vendored.pandas.core.groupby as vendored_pandas_groupby
Expand All @@ -28,7 +28,7 @@
from bigframes.core import log_adapter
import bigframes.core.block_transforms as block_ops
import bigframes.core.blocks as blocks
from bigframes.core.groupby import aggs
from bigframes.core.groupby import aggs, group_by
import bigframes.core.ordering as order
import bigframes.core.utils as utils
import bigframes.core.validations as validations
Expand All @@ -52,6 +52,8 @@ def __init__(
by_col_ids: typing.Sequence[str],
value_name: blocks.Label = None,
dropna=True,
*,
by_key_is_singular: bool = False,
):
# TODO(tbergeron): Support more group-by expression types
self._block = block
Expand All @@ -60,6 +62,10 @@ def __init__(
self._value_name = value_name
self._dropna = dropna # Applies to aggregations but not windowing

self._by_key_is_singular = by_key_is_singular
if by_key_is_singular:
assert len(by_col_ids) == 1, "singular key should be exactly one group key"

@property
def _session(self) -> session.Session:
return self._block.session
Expand Down Expand Up @@ -89,6 +95,19 @@ def describe(self, include: None | Literal["all"] = None):
)
).droplevel(level=0, axis=1)

def __iter__(self) -> Iterable[Tuple[blocks.Label, series.Series]]:
for group_keys, filtered_block in group_by.block_groupby_iter(
self._block,
by_col_ids=self._by_col_ids,
by_key_is_singular=self._by_key_is_singular,
dropna=self._dropna,
):
filtered_series = series.Series(
filtered_block.select_column(self._value_column)
)
filtered_series.name = self._value_name
yield group_keys, filtered_series

def all(self) -> series.Series:
return self._aggregate(agg_ops.all_op)

Expand Down
11 changes: 11 additions & 0 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3913,11 +3913,17 @@ def _groupby_level(
as_index: bool = True,
dropna: bool = True,
):
if utils.is_list_like(level):
by_key_is_singular = False
else:
by_key_is_singular = True

return groupby.DataFrameGroupBy(
self._block,
by_col_ids=self._resolve_levels(level),
as_index=as_index,
dropna=dropna,
by_key_is_singular=by_key_is_singular,
)

def _groupby_series(
Expand All @@ -3930,10 +3936,14 @@ def _groupby_series(
as_index: bool = True,
dropna: bool = True,
):
# Pandas makes a distinction between groupby with a list of keys
# versus groupby with a single item in some methods, like __iter__.
if not isinstance(by, bigframes.series.Series) and utils.is_list_like(by):
by = list(by)
by_key_is_singular = False
else:
by = [typing.cast(typing.Union[blocks.Label, bigframes.series.Series], by)]
by_key_is_singular = True

block = self._block
col_ids: typing.Sequence[str] = []
Expand Down Expand Up @@ -3963,6 +3973,7 @@ def _groupby_series(
by_col_ids=col_ids,
as_index=as_index,
dropna=dropna,
by_key_is_singular=by_key_is_singular,
)

def abs(self) -> DataFrame:
Expand Down
9 changes: 9 additions & 0 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1854,12 +1854,18 @@ def _groupby_level(
level: int | str | typing.Sequence[int] | typing.Sequence[str],
dropna: bool = True,
) -> bigframes.core.groupby.SeriesGroupBy:
if utils.is_list_like(level):
by_key_is_singular = False
else:
by_key_is_singular = True

return groupby.SeriesGroupBy(
self._block,
self._value_column,
by_col_ids=self._resolve_levels(level),
value_name=self.name,
dropna=dropna,
by_key_is_singular=by_key_is_singular,
)

def _groupby_values(
Expand All @@ -1871,8 +1877,10 @@ def _groupby_values(
) -> bigframes.core.groupby.SeriesGroupBy:
if not isinstance(by, Series) and _is_list_like(by):
by = list(by)
by_key_is_singular = False
else:
by = [typing.cast(typing.Union[blocks.Label, Series], by)]
by_key_is_singular = True

block = self._block
grouping_cols: typing.Sequence[str] = []
Expand Down Expand Up @@ -1904,6 +1912,7 @@ def _groupby_values(
by_col_ids=grouping_cols,
value_name=self.name,
dropna=dropna,
by_key_is_singular=by_key_is_singular,
)

def apply(
Expand Down
Loading
0