10BC0 feat: Add quantile statistic by TrevorBergeron · Pull Request #613 · googleapis/python-bigquery-dataframes · GitHub
[go: up one dir, main page]

Skip to content
3 changes: 3 additions & 0 deletions bigframes/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,6 @@
LEP_ENABLED_BIGQUERY_LOCATIONS = frozenset(
ALL_BIGQUERY_LOCATIONS - REP_ENABLED_BIGQUERY_LOCATIONS
)

# BigQuery default is 10000, leave 100 for overhead
MAX_COLUMNS = 9900
34 changes: 34 additions & 0 deletions bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import functools
import typing
from typing import Sequence

import pandas as pd

Expand Down Expand Up @@ -105,6 +106,39 @@ def indicate_duplicates(
)


def quantile(
block: blocks.Block,
columns: Sequence[str],
qs: Sequence[float],
grouping_column_ids: Sequence[str] = (),
) -> blocks.Block:
# TODO: handle windowing and more interpolation methods
window = core.WindowSpec(
grouping_keys=tuple(grouping_column_ids),
)
quantile_cols = []
labels = []
if len(columns) * len(qs) > constants.MAX_COLUMNS:
raise NotImplementedError("Too many aggregates requested.")
for col in columns:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throws an exception for dataframe if column_name is larger than 30?

if len(self.value_columns) > 30:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added limit

for q in qs:
label = block.col_id_to_label[col]
new_label = (*label, q) if isinstance(label, tuple) else (label, q)
labels.append(new_label)
block, quantile_col = block.apply_window_op(
col,
agg_ops.QuantileOp(q),
window_spec=window,
)
quantile_cols.append(quantile_col)
block, results = block.aggregate(
grouping_column_ids,
tuple((col, agg_ops.AnyValueOp()) for col in quantile_cols),
dropna=True,
)
return block.select_columns(results).with_column_labels(labels)


def interpolate(block: blocks.Block, method: str = "linear") -> blocks.Block:
supported_methods = [
"linear",
Expand Down
13 changes: 9 additions & 4 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1498,12 +1498,17 @@ def stack(self, how="left", levels: int = 1):

row_label_tuples = utils.index_as_tuples(row_labels)

if col_labels is not None:
if col_labels is None:
result_index: pd.Index = pd.Index([None])
result_col_labels: Sequence[Tuple] = list([()])
elif (col_labels.nlevels == 1) and all(
col_labels.isna()
): # isna not implemented for MultiIndex for newer pandas versions
result_index = pd.Index([None])
result_col_labels = utils.index_as_tuples(col_labels.drop_duplicates())
else:
result_index = col_labels.drop_duplicates().dropna(how="all")
result_col_labels = utils.index_as_tuples(result_index)
else:
result_index = pd.Index([None])
result_col_labels = list([()])

# Get matching columns
unpivot_columns: List[Tuple[str, List[str]]] = []
Expand Down
8 changes: 8 additions & 0 deletions bigframes/core/compile/aggregate_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ def _(
return cast(ibis_types.NumericValue, value)


@compile_unary_agg.register
@numeric_op
def _(
op: agg_ops.QuantileOp, column: ibis_types.NumericColumn, window=None
) -> ibis_types.NumericValue:
return _apply_window_if_present(column.quantile(op.q), window)


@compile_unary_agg.register
@numeric_op
def _(
Expand Down
57 changes: 51 additions & 6 deletions bigframes/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from __future__ import annotations

import typing
from typing import Sequence, Union

import bigframes_vendored.pandas.core.groupby as vendored_pandas_groupby
import pandas as pd
Expand Down Expand Up @@ -115,14 +116,35 @@ def mean(self, numeric_only: bool = False, *args) -> df.DataFrame:
def median(
self, numeric_only: bool = False, *, exact: bool = False
) -> df.DataFrame:
if exact:
raise NotImplementedError(
f"Only approximate median is supported. {constants.FEEDBACK_LINK}"
)
if not numeric_only:
self._raise_on_non_numeric("median")
if exact:
return self.quantile(0.5)
return self._aggregate_all(agg_ops.median_op, numeric_only=True)

def quantile(
self, q: Union[float, Sequence[float]] = 0.5, *, numeric_only: bool = False
) -> df.DataFrame:
if not numeric_only:
self._raise_on_non_numeric("quantile")
q_cols = tuple(
col
for col in self._selected_cols
if self._column_type(col) in dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE
)
multi_q = utils.is_list_like(q)
result = block_ops.quantile(
self._block,
q_cols,
qs=tuple(q) if multi_q else (q,), # type: ignore
grouping_column_ids=self._by_col_ids,
)
result_df = df.DataFrame(result)
if multi_q:
return result_df.stack()
else:
return result_df.droplevel(-1, 1)

def min(self, numeric_only: bool = False, *args) -> df.DataFrame:
return self._aggregate_all(agg_ops.min_op, numeric_only=numeric_only)

Expand Down Expand Up @@ -466,8 +488,31 @@ def sum(self, *args) -> series.Series:
def mean(self, *args) -> series.Series:
return self._aggregate(agg_ops.mean_op)

def median(self, *args, **kwargs) -> series.Series:
return self._aggregate(agg_ops.mean_op)
def median(
self,
*args,
exact: bool = False,
**kwargs,
) -> series.Series:
if exact:
return self.quantile(0.5)
else:
return self._aggregate(agg_ops.median_op)

def quantile(
self, q: Union[float, Sequence[float]] = 0.5, *, numeric_only: bool = False
) -> series.Series:
multi_q = utils.is_list_like(q)
result = block_ops.quantile(
self._block,
(self._value_column,),
qs=tuple(q) if multi_q else (q,), # type: ignore
grouping_column_ids=self._by_col_ids,
)
if multi_q:
return series.Series(result.stack())
else:
return series.Series(result.stack()).droplevel(-1)

def std(self, *args, **kwargs) -> series.Series:
return self._aggregate(agg_ops.std_op)
Expand Down
30 changes: 28 additions & 2 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2009,8 +2009,34 @@ def median(
frame = self._raise_on_non_numeric("median")
else:
frame = self._drop_non_numeric()
block = frame._block.aggregate_all_and_stack(agg_ops.median_op)
return bigframes.series.Series(block.select_column("values"))
if exact:
return self.quantile()
else:
block = frame._block.aggregate_all_and_stack(agg_ops.median_op)
return bigframes.series.Series(block.select_column("values"))

def quantile(
self, q: Union[float, Sequence[float]] = 0.5, *, numeric_only: bool = False
):
if not numeric_only:
frame = self._raise_on_non_numeric("median")
else:
frame = self._drop_non_numeric()
multi_q = utils.is_list_like(q)
result = block_ops.quantile(
frame._block, frame._block.value_columns, qs=tuple(q) if multi_q else (q,) # type: ignore
)
if multi_q:
return DataFrame(result.stack()).droplevel(0)
else:
result_df = (
DataFrame(result)
.stack(list(range(0, frame.columns.nlevels)))
.droplevel(0)
)
result_series = bigframes.series.Series(result_df._block)
result_series.name = q
return result_series

def std(
self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False
Expand Down
12 changes: 12 additions & 0 deletions bigframes/operations/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT
return input_types[0]


@dataclasses.dataclass(frozen=True)
class QuantileOp(UnaryAggregateOp):
q: float

@property
def name(self):
return f"{int(self.q*100)}%"

def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType:
return signatures.UNARY_REAL_NUMERIC.output_type(input_types[0])


@dataclasses.dataclass(frozen=True)
class ApproxQuartilesOp(UnaryAggregateOp):
quartile: int
Expand Down
19 changes: 14 additions & 5 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import os
import textwrap
import typing
from typing import Any, Literal, Mapping, Optional, Sequence, Tuple, Union
from typing import Any, cast, Literal, Mapping, Optional, Sequence, Tuple, Union

import bigframes_vendored.pandas.core.series as vendored_pandas_series
import google.cloud.bigquery as bigquery
Expand Down Expand Up @@ -968,10 +968,19 @@ def mean(self) -> float:

def median(self, *, exact: bool = False) -> float:
if exact:
raise NotImplementedError(
f"Only approximate median is supported. {constants.FEEDBACK_LINK}"
)
return typing.cast(float, self._apply_aggregation(agg_ops.median_op))
return typing.cast(float, self.quantile(0.5))
else:
return typing.cast(float, self._apply_aggregation(agg_ops.median_op))

def quantile(self, q: Union[float, Sequence[float]] = 0.5) -> Union[Series, float]:
qs = tuple(q) if utils.is_list_like(q) else (q,)
result = block_ops.quantile(self._block, (self._value_column,), qs=qs)
if utils.is_list_like(q):
result = result.stack()
result = result.drop_levels([result.index_columns[0]])
return Series(result)
else:
return cast(float, Series(result).to_pandas().squeeze())

def sum(self) -> float:
return typing.cast(float, self._apply_aggregation(agg_ops.sum_op))
Expand Down
30 changes: 29 additions & 1 deletion tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2504,7 +2504,10 @@ def test_df_melt_default(scalars_dfs):

# Pandas produces int64 index, Bigframes produces Int64 (nullable)
pd.testing.assert_frame_equal(
bf_result, pd_result, check_index_type=False, check_dtype=False
bf_result,
pd_result,
check_index_type=False,
check_dtype=False,
)


Expand Down Expand Up @@ -3029,6 +3032,31 @@ def test_dataframe_aggregates_median(scalars_df_index, scalars_pandas_df_index):
)


def test_dataframe_aggregates_quantile_mono(scalars_df_index, scalars_pandas_df_index):
q = 0.45
col_names = ["int64_too", "int64_col", "float64_col"]
bf_result = scalars_df_index[col_names].quantile(q=q).to_pandas()
pd_result = scalars_pandas_df_index[col_names].quantile(q=q)

# Pandas may produce narrower numeric types, but bigframes always produces Float64
pd_result = pd_result.astype("Float64")

pd.testing.assert_series_equal(bf_result, pd_result, check_index_type=False)


def test_dataframe_aggregates_quantile_multi(scalars_df_index, scalars_pandas_df_index):
q = [0, 0.33, 0.67, 1.0]
col_names = ["int64_too", "int64_col", "float64_col"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe support the numeric_only parameter also?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

bf_result = scalars_df_index[col_names].quantile(q=q).to_pandas()
pd_result = scalars_pandas_df_index[col_names].quantile(q=q)

# Pandas may produce narrower numeric types, but bigframes always produces Float64
pd_result = pd_result.astype("Float64")
pd_result.index = pd_result.index.astype("Float64")

pd.testing.assert_frame_equal(bf_result, pd_result)


@pytest.mark.parametrize(
("op"),
[
Expand Down
35 changes: 35 additions & 0 deletions tests/system/small/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,24 @@ def test_dataframe_groupby_median(scalars_df_index, scalars_pandas_df_index):
assert ((pd_min <= bf_result_computed) & (bf_result_computed <= pd_max)).all().all()


@pytest.mark.parametrize(
("q"),
[
([0.2, 0.4, 0.6, 0.8]),
(0.11),
],
)
def test_dataframe_groupby_quantile(scalars_df_index, scalars_pandas_df_index, q):
col_names = ["int64_too", "float64_col", "int64_col", "string_col"]
bf_result = (
scalars_df_index[col_names].groupby("string_col").quantile(q)
).to_pandas()
pd_result = scalars_pandas_df_index[col_names].groupby("string_col").quantile(q)
pd.testing.assert_frame_equal(
pd_result, bf_result, check_dtype=False, check_index_type=False
)


@pytest.mark.parametrize(
("operator"),
[
Expand Down Expand Up @@ -389,3 +407,20 @@ def test_dataframe_groupby_nonnumeric_with_mean():
pd.testing.assert_frame_equal(
pd_result, bf_result, check_index_type=False, check_dtype=False
)


@pytest.mark.parametrize(
("q"),
[
([0.2, 0.4, 0.6, 0.8]),
(0.11),
],
)
def test_series_groupby_quantile(scalars_df_index, scalars_pandas_df_index, q):
bf_result = (
scalars_df_index.groupby("string_col")["int64_col"].quantile(q)
).to_pandas()
pd_result = scalars_pandas_df_index.groupby("string_col")["int64_col"].quantile(q)
pd.testing.assert_series_equal(
pd_result, bf_result, check_dtype=False, check_index_type=False
)
21 changes: 21 additions & 0 deletions tests/system/small/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1320,6 +1320,27 @@ def test_median(scalars_dfs):
assert pd_min < bf_result < pd_max


def test_median_exact(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs
col_name = "int64_col"
bf_result = scalars_df[col_name].median(exact=True)
pd_result = scalars_pandas_df[col_name].median()
assert math.isclose(pd_result, bf_result)


def test_series_quantile(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs
col_name = "int64_col"
bf_series = scalars_df[col_name]
pd_series = scalars_pandas_df[col_name]

pd_result = pd_series.quantile([0.0, 0.4, 0.6, 1.0])
bf_result = bf_series.quantile([0.0, 0.4, 0.6, 1.0])
pd.testing.assert_series_equal(
pd_result, bf_result.to_pandas(), check_dtype=False, check_index_type=False
)


def test_numeric_literal(scalars_dfs):
scalars_df, _ = scalars_dfs
col_name = "numeric_col"
Expand Down
Loading
0